This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 3892fc417c2f7d07e15b78eae1e1b3dd09e60090 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Dec 30 13:46:22 2020 -0800 Rename partitionId to partitionGroupId --- .../segmentselector/RealtimeSegmentSelector.java | 2 +- .../apache/pinot/common/utils/LLCSegmentName.java | 24 +++++----- .../org/apache/pinot/common/utils/SegmentName.java | 2 +- .../pinot/common/utils/SegmentNameBuilderTest.java | 6 +-- .../helix/core/PinotHelixResourceManager.java | 5 ++- .../helix/core/PinotTableIdealStateBuilder.java | 15 ++++--- .../segment/RealtimeSegmentAssignment.java | 6 +-- .../RealtimeToOfflineSegmentsTaskGenerator.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 51 ++++++++++++---------- .../SegmentSizeBasedFlushThresholdUpdater.java | 2 +- .../PinotLLCRealtimeSegmentManagerTest.java | 16 ++++--- .../realtime/LLRealtimeSegmentDataManager.java | 34 ++++++++------- .../manager/realtime/RealtimeTableDataManager.java | 13 +++--- .../realtime/LLRealtimeSegmentDataManagerTest.java | 10 ++--- .../fakestream/FakePartitionGroupMetadata.java | 48 ++++++++++++++++++++ .../impl/fakestream/FakeStreamConsumerFactory.java | 10 +---- .../fakestream/FakeStreamMetadataProvider.java | 15 ++++++- ...lakyConsumerRealtimeClusterIntegrationTest.java | 9 +--- ...PartitionLLCRealtimeClusterIntegrationTest.java | 6 +-- .../stream/kafka09/KafkaConsumerFactory.java | 9 +--- .../kafka09/KafkaPartitionGroupMetadata.java | 48 ++++++++++++++++++++ .../kafka09/KafkaStreamMetadataProvider.java | 26 +++++++++++ .../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +- .../stream/kafka20/KafkaConsumerFactory.java | 9 +--- .../kafka20/KafkaPartitionGroupMetadata.java | 48 ++++++++++++++++++++ .../kafka20/KafkaStreamMetadataProvider.java | 21 +++++++++ ...her.java => PartitionGroupMetadataFetcher.java} | 18 +++++--- .../pinot/spi/stream/PartitionOffsetFetcher.java | 15 ++++--- .../pinot/spi/stream/StreamConsumerFactory.java | 8 +--- .../pinot/spi/stream/StreamMetadataProvider.java | 9 +++- 30 files changed, 347 insertions(+), 144 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java index f462326..2d778c6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java @@ -95,7 +95,7 @@ public class RealtimeSegmentSelector implements SegmentSelector { if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { // Keep the first CONSUMING segment for each partition LLCSegmentName llcSegmentName = new LLCSegmentName(segment); - partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(), (k, consumingSegment) -> { + partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionGroupId(), (k, consumingSegment) -> { if (consumingSegment == null) { return llcSegmentName; } else { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java index adc24ad..a66bb3c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java @@ -26,7 +26,7 @@ import org.joda.time.DateTimeZone; public class LLCSegmentName extends SegmentName implements Comparable { private final static String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'"; private final String _tableName; - private final int _partitionId; + private final int _partitionGroupId; private final int _sequenceNumber; private final String _creationTime; private final String _segmentName; @@ -39,22 +39,22 @@ public class LLCSegmentName extends SegmentName implements Comparable { _segmentName = segmentName; String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); _tableName = parts[0]; - _partitionId = Integer.parseInt(parts[1]); + _partitionGroupId = Integer.parseInt(parts[1]); _sequenceNumber = Integer.parseInt(parts[2]); _creationTime = parts[3]; } - public LLCSegmentName(String tableName, int partitionId, int sequenceNumber, long msSinceEpoch) { + public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) { if (!isValidComponentName(tableName)) { throw new RuntimeException("Invalid table name " + tableName); } _tableName = tableName; - _partitionId = partitionId; + _partitionGroupId = partitionGroupId; _sequenceNumber = sequenceNumber; // ISO8601 date: 20160120T1234Z DateTime dateTime = new DateTime(msSinceEpoch, DateTimeZone.UTC); _creationTime = dateTime.toString(DATE_FORMAT); - _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime; + _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime; } /** @@ -75,13 +75,13 @@ public class LLCSegmentName extends SegmentName implements Comparable { } @Override - public int getPartitionId() { - return _partitionId; + public int getPartitionGroupId() { + return _partitionGroupId; } @Override public String getPartitionRange() { - return Integer.toString(getPartitionId()); + return Integer.toString(getPartitionGroupId()); } @Override @@ -110,9 +110,9 @@ public class LLCSegmentName extends SegmentName implements Comparable { throw new RuntimeException( "Cannot compare segment names " + this.getSegmentName() + " and " + other.getSegmentName()); } - if (this.getPartitionId() > other.getPartitionId()) { + if (this.getPartitionGroupId() > other.getPartitionGroupId()) { return 1; - } else if (this.getPartitionId() < other.getPartitionId()) { + } else if (this.getPartitionGroupId() < other.getPartitionGroupId()) { return -1; } else { if (this.getSequenceNumber() > other.getSequenceNumber()) { @@ -141,7 +141,7 @@ public class LLCSegmentName extends SegmentName implements Comparable { LLCSegmentName segName = (LLCSegmentName) o; - if (_partitionId != segName._partitionId) { + if (_partitionGroupId != segName._partitionGroupId) { return false; } if (_sequenceNumber != segName._sequenceNumber) { @@ -159,7 +159,7 @@ public class LLCSegmentName extends SegmentName implements Comparable { @Override public int hashCode() { int result = _tableName != null ? _tableName.hashCode() : 0; - result = 31 * result + _partitionId; + result = 31 * result + _partitionGroupId; result = 31 * result + _sequenceNumber; result = 31 * result + (_creationTime != null ? _creationTime.hashCode() : 0); result = 31 * result + (_segmentName != null ? _segmentName.hashCode() : 0); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java index 6763f6d..b0c00ae 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java @@ -63,7 +63,7 @@ public abstract class SegmentName { throw new RuntimeException("No groupId in " + getSegmentName()); } - public int getPartitionId() { + public int getPartitionGroupId() { throw new RuntimeException("No partitionId in " + getSegmentName()); } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java index f632f51..de606cc 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java @@ -58,7 +58,7 @@ public class SegmentNameBuilderTest { // Check partition range assertEquals(longNameSegment.getPartitionRange(), "0"); assertEquals(shortNameSegment.getPartitionRange(), "ALL"); - assertEquals(llcSegment.getPartitionId(), 0); + assertEquals(llcSegment.getPartitionGroupId(), 0); // Check groupId assertEquals(longNameSegment.getGroupId(), "myTable_REALTIME_1234567_0"); @@ -127,14 +127,14 @@ public class SegmentNameBuilderTest { LLCSegmentName segName1 = new LLCSegmentName(tableName, partitionId, sequenceNumber, msSinceEpoch); Assert.assertEquals(segName1.getSegmentName(), segmentName); - Assert.assertEquals(segName1.getPartitionId(), partitionId); + Assert.assertEquals(segName1.getPartitionGroupId(), partitionId); Assert.assertEquals(segName1.getCreationTime(), creationTime); Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber); Assert.assertEquals(segName1.getTableName(), tableName); LLCSegmentName segName2 = new LLCSegmentName(segmentName); Assert.assertEquals(segName2.getSegmentName(), segmentName); - Assert.assertEquals(segName2.getPartitionId(), partitionId); + Assert.assertEquals(segName2.getPartitionGroupId(), partitionId); Assert.assertEquals(segName2.getCreationTime(), creationTime); Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber); Assert.assertEquals(segName2.getTableName(), tableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index fa117fa..a04e0bc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -130,6 +130,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -1396,6 +1397,8 @@ public class PinotHelixResourceManager { */ private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) { StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + StreamMetadataProvider streamMetadataProvider = streamConsumerFactory + .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis()); // get current partition groups and their metadata - this will be empty when creating the table List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState); @@ -1403,7 +1406,7 @@ public class PinotHelixResourceManager { // get new partition groups and their metadata, // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard) List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList); + streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000); // setup segment zk metadata and ideal state for all the new found partition groups _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index a564542..1e95966 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -32,7 +32,8 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.stream.PartitionCountFetcher; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -115,13 +116,15 @@ public class PinotTableIdealStateBuilder { pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); } - public static int getPartitionCount(StreamConfig streamConfig) { - PartitionCountFetcher partitionCountFetcher = new PartitionCountFetcher(streamConfig); + public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = + new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList); try { - RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher); - return partitionCountFetcher.getPartitionCount(); + RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher); + return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); } catch (Exception e) { - Exception fetcherException = partitionCountFetcher.getException(); + Exception fetcherException = partitionGroupMetadataFetcher.getException(); LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException); throw new RuntimeException(fetcherException); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index a069734..e27958f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -136,7 +136,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { * Helper method to assign instances for CONSUMING segment based on the segment partition id and instance partitions. */ private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) { - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId(); int numReplicaGroups = instancePartitions.getNumReplicaGroups(); if (numReplicaGroups == 1) { @@ -325,7 +325,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>(); for (String segmentName : currentAssignment.keySet()) { - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId(); partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName); } @@ -360,7 +360,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment { // Replica-group based assignment // Uniformly spray the segment partitions over the instance partitions - int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionId(); + int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionGroupId(); int numPartitions = instancePartitions.getNumPartitions(); int partitionId = segmentPartitionId % numPartitions; return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java index a278396..8208d8e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java @@ -252,11 +252,11 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>(); for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) { LLCSegmentName llcSegmentName = new LLCSegmentName(metadata.getSegmentName()); - allPartitions.add(llcSegmentName.getPartitionId()); + allPartitions.add(llcSegmentName.getPartitionGroupId()); if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) { completedSegmentsMetadataList.add(metadata); - latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> { + latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> { if (latestLLCSegmentName == null) { return llcSegmentName; } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 8a29489..189be8b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -83,6 +83,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -186,8 +187,10 @@ public class PinotLLCRealtimeSegmentManager { // get new partition groups (honor any groupings which are already consuming - [0], [1], [2]) StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + StreamMetadataProvider streamMetadataProvider = streamConsumerFactory + .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis()); List<PartitionGroupMetadata> newPartitionGroupMetadataList = - streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList); + streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000); // from the above list, remove the partition groups which are already CONSUMING // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0]) @@ -292,7 +295,8 @@ public class PinotLLCRealtimeSegmentManager { PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); - int numPartitions = getNumPartitions(streamConfig); + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); + int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size(); int numReplicas = getNumReplicas(tableConfig, instancePartitions); SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); @@ -301,9 +305,9 @@ public class PinotLLCRealtimeSegmentManager { long currentTimeMs = getCurrentTimeMs(); Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); - for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) { String segmentName = - setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, + setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, instancePartitionsMap); @@ -635,7 +639,7 @@ public class PinotLLCRealtimeSegmentManager { // Add the partition metadata if available SegmentPartitionMetadata partitionMetadata = - getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId()); + getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId()); if (partitionMetadata != null) { newSegmentZKMetadata.setPartitionMetadata(partitionMetadata); } @@ -705,22 +709,23 @@ public class PinotLLCRealtimeSegmentManager { } @VisibleForTesting - int getNumPartitions(StreamConfig streamConfig) { - return PinotTableIdealStateBuilder.getPartitionCount(streamConfig); + List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList); } @VisibleForTesting StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, - int partitionId) { + int partitionGroupId) { PartitionOffsetFetcher partitionOffsetFetcher = - new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig); + new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig); try { RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher); return partitionOffsetFetcher.getOffset(); } catch (Exception e) { throw new IllegalStateException(String .format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s", - streamConfig.getTopicName(), partitionId, offsetCriteria), e); + streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e); } } @@ -768,7 +773,7 @@ public class PinotLLCRealtimeSegmentManager { Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>(); for (String segmentName : segments) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); - latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> { + latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> { if (latestLLCSegmentName == null) { return llcSegmentName; } else { @@ -821,10 +826,12 @@ public class PinotLLCRealtimeSegmentManager { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - int numPartitions = getNumPartitions(streamConfig); HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; if (idealState.isEnabled()) { + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = + getCurrentPartitionGroupMetadataList(idealState); + int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size(); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions); } else { LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName); @@ -1085,7 +1092,7 @@ public class PinotLLCRealtimeSegmentManager { String previousConsumingSegment = null; for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey()); - if (llcSegmentName.getPartitionId() == partitionId && segmentEntry.getValue() + if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue() .containsValue(SegmentStateModel.CONSUMING)) { previousConsumingSegment = llcSegmentName.getSegmentName(); break; @@ -1110,7 +1117,7 @@ public class PinotLLCRealtimeSegmentManager { for (int partitionId = 0; partitionId < numPartitions; partitionId++) { if (!latestSegmentZKMetadataMap.containsKey(partitionId)) { String newSegmentName = - setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, + setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions, numReplicas); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); @@ -1121,7 +1128,7 @@ public class PinotLLCRealtimeSegmentManager { } private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { - return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionId(), + return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(), lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs); } @@ -1129,21 +1136,21 @@ public class PinotLLCRealtimeSegmentManager { * Sets up a new partition. * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name. */ - private String setupNewPartition(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionId, - long creationTimeMs, InstancePartitions instancePartitions, int numPartitions, int numReplicas) { + private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId, + long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) { String realtimeTableName = tableConfig.getTableName(); - LOGGER.info("Setting up new partition: {} for table: {}", partitionId, realtimeTableName); + LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); LLCSegmentName newLLCSegmentName = - new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs); + new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs); String newSegmentName = newLLCSegmentName.getSegmentName(); StreamPartitionMsgOffset startOffset = - getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId); + getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId); CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset.toString(), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs, - committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas); + committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas); return newSegmentName; } @@ -1157,7 +1164,7 @@ public class PinotLLCRealtimeSegmentManager { int numPartitions = 0; for (String segmentName : idealState.getRecord().getMapFields().keySet()) { if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { - numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1); + numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1); } } return numPartitions; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java index 2e73806..56ae29e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java @@ -102,7 +102,7 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda // less same characteristics at any one point in time). // However, when we start a new table or change controller mastership, we can have any partition completing first. // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value. - if (new LLCSegmentName(newSegmentName).getPartitionId() == 0 || _latestSegmentRowsToSizeRatio == 0) { + if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) { if (_latestSegmentRowsToSizeRatio > 0) { _latestSegmentRowsToSizeRatio = CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 4888f17..743e719 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; @@ -48,6 +50,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.util.SegmentCompletionUtils; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; @@ -57,6 +60,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -333,7 +337,7 @@ public class PinotLLCRealtimeSegmentManagerTest { assertTrue(oldSegmentZKMetadataMap.containsKey(segmentName)); assertTrue(segmentZKMetadataMap.containsKey(segmentName)); assertEquals(segmentZKMetadataMap.get(segmentName), oldSegmentZKMetadataMap.get(segmentName)); - oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1); + oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1); } // Check that for new partitions, each partition should have exactly 1 new segment in CONSUMING state, and metadata @@ -341,7 +345,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>(); for (Map.Entry<String, Map<String, String>> entry : instanceStatesMap.entrySet()) { String segmentName = entry.getKey(); - int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId(); partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName); } for (int partitionId = oldNumPartitions; partitionId < segmentManager._numPartitions; partitionId++) { @@ -579,7 +583,7 @@ public class PinotLLCRealtimeSegmentManagerTest { if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap.containsValue( SegmentStateModel.CONSUMING)) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); - int partitionsId = llcSegmentName.getPartitionId(); + int partitionsId = llcSegmentName.getPartitionGroupId(); Map<Integer, String> sequenceNumberToSegmentMap = partitionIdToSegmentsMap.get(partitionsId); int sequenceNumber = llcSegmentName.getSequenceNumber(); assertFalse(sequenceNumberToSegmentMap.containsKey(sequenceNumber)); @@ -910,12 +914,12 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - int getNumPartitions(StreamConfig streamConfig) { - return _numPartitions; + List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { + return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList()); } @Override - LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionId) { + LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) { // The criteria for this test should always be SMALLEST (for default streaming config and new added partitions) assertTrue(offsetCriteria.isSmallest()); return PARTITION_OFFSET; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 84d4592..13a9ab2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -214,7 +215,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Semaphore for each partitionId only, which is to prevent two different Kafka consumers // from consuming with the same partitionId in parallel in the same host. // See the comments in {@link RealtimeTableDataManager}. - private final Semaphore _partitionConsumerSemaphore; + private final Semaphore _partitionGroupConsumerSemaphore; // A boolean flag to check whether the current thread has acquired the semaphore. // This boolean is needed because the semaphore is shared by threads; every thread holding this semaphore can // modify the permit. This boolean make sure the semaphore gets released only once when the partition stops consuming. @@ -247,7 +248,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private Thread _consumerThread; private final String _streamTopic; - private final int _streamPartitionId; + private final int _partitionGroupId; final String _clientId; private final LLCSegmentName _llcSegmentName; private final RecordTransformer _recordTransformer; @@ -705,7 +706,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { @Override public Map<String, String> getPartitionToCurrentOffset() { Map<String, String> partitionToCurrentOffset = new HashMap<>(); - partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString()); + partitionToCurrentOffset.put(String.valueOf(_partitionGroupId), _currentOffset.toString()); return partitionToCurrentOffset; } @@ -730,8 +731,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } @VisibleForTesting - protected Semaphore getPartitionConsumerSemaphore() { - return _partitionConsumerSemaphore; + protected Semaphore getPartitionGroupConsumerSemaphore() { + return _partitionGroupConsumerSemaphore; } @VisibleForTesting @@ -892,7 +893,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { closePartitionLevelConsumer(); closeStreamMetadataProvider(); if (_acquiredConsumerSemaphore.compareAndSet(true, false)) { - _partitionConsumerSemaphore.release(); + _partitionGroupConsumerSemaphore.release(); } } @@ -1102,7 +1103,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here. public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, - Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics, + Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) { _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore(); _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata; @@ -1129,10 +1130,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _streamTopic = _partitionLevelStreamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; - _streamPartitionId = _llcSegmentName.getPartitionId(); - _partitionConsumerSemaphore = partitionConsumerSemaphore; + _partitionGroupId = _llcSegmentName.getPartitionGroupId(); + _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); - _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId; + _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId; segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); _tableStreamName = _tableNameWithType + "_" + _streamTopic; _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr, @@ -1210,14 +1211,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Create message decoder Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema); _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead); - _clientId = _streamTopic + "-" + _streamPartitionId; + _clientId = _streamTopic + "-" + _partitionGroupId; // Create record transformer _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema); // Acquire semaphore to create Kafka consumers try { - _partitionConsumerSemaphore.acquire(); + _partitionGroupConsumerSemaphore.acquire(); _acquiredConsumerSemaphore.set(true); } catch (InterruptedException e) { String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore"; @@ -1243,7 +1244,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // long as the partition function is not changed. int numPartitions = columnPartitionConfig.getNumPartitions(); try { - int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L); + // fixme: get this from ideal state + int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size(); if (numStreamPartitions != numPartitions) { segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", @@ -1261,7 +1263,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn); realtimeSegmentConfigBuilder .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions)); - realtimeSegmentConfigBuilder.setPartitionId(_streamPartitionId); + realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId); } else { segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet()); } @@ -1313,7 +1315,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { closePartitionLevelConsumer(); } segmentLogger.info("Creating new stream consumer, reason: {}", reason); - _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _streamPartitionId); + _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId); } /** @@ -1325,7 +1327,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { closeStreamMetadataProvider(); } segmentLogger.info("Creating new stream metadata provider, reason: {}", reason); - _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _streamPartitionId); + _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); } // This should be done during commit? We may not always commit when we build a segment.... diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 33283b9..9850048 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.helix.model.IdealState; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; @@ -89,7 +90,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) consuming from the same stream partition can lead to bugs. // The semaphores will stay in the hash map even if the consuming partitions move to a different host. // We expect that there will be a small number of semaphores, but that may be ok. - private final Map<Integer, Semaphore> _partitionIdToSemaphoreMap = new ConcurrentHashMap<>(); + private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>(); // The old name of the stats file used to be stats.ser which we changed when we moved all packages // from com.linkedin to org.apache because of not being able to deserialize the old files using the newer classes @@ -274,7 +275,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { llcSegmentName = new LLCSegmentName(segmentName); if (_tableUpsertMetadataManager != null) { partitionUpsertMetadataManager = - _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionId()); + _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionGroupId()); } } @@ -307,11 +308,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } // Generates only one semaphore for every partitionId - int partitionId = llcSegmentName.getPartitionId(); - _partitionIdToSemaphoreMap.putIfAbsent(partitionId, new Semaphore(1)); + int partitionGroupId = llcSegmentName.getPartitionGroupId(); + _partitionGroupIdToSemaphoreMap.putIfAbsent(partitionGroupId, new Semaphore(1)); manager = new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(), - indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(partitionId), _serverMetrics, + indexLoadingConfig, schema, llcSegmentName, _partitionGroupIdToSemaphoreMap.get(partitionGroupId), _serverMetrics, partitionUpsertMetadataManager); } _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName); @@ -336,7 +337,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName)); int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs(); String segmentName = immutableSegment.getSegmentName(); - int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId(); + int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId(); PartitionUpsertMetadataManager partitionUpsertMetadataManager = _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId); int numPrimaryKeyColumns = _primaryKeyColumns.size(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index 0017c43..d09bdeb 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -719,7 +719,7 @@ public class LLRealtimeSegmentDataManagerTest { long timeout = 10_000L; FakeLLRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager(); Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get()); - Semaphore firstSemaphore = firstSegmentDataManager.getPartitionConsumerSemaphore(); + Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore(); Assert.assertEquals(firstSemaphore.availablePermits(), 0); Assert.assertFalse(firstSemaphore.hasQueuedThreads()); @@ -751,18 +751,18 @@ public class LLRealtimeSegmentDataManagerTest { "Failed to acquire the semaphore for the second segment manager in " + timeout + "ms"); Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get()); - Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionConsumerSemaphore(); + Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore(); Assert.assertEquals(firstSemaphore, secondSemaphore); Assert.assertEquals(secondSemaphore.availablePermits(), 0); Assert.assertFalse(secondSemaphore.hasQueuedThreads()); // Call destroy method the 2nd time on the first segment manager, the permits in semaphore won't increase. firstSegmentDataManager.destroy(); - Assert.assertEquals(firstSegmentDataManager.getPartitionConsumerSemaphore().availablePermits(), 0); + Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(), 0); // The permit finally gets released in the Semaphore. secondSegmentDataManager.get().destroy(); - Assert.assertEquals(secondSegmentDataManager.get().getPartitionConsumerSemaphore().availablePermits(), 1); + Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1); } public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager { @@ -800,7 +800,7 @@ public class LLRealtimeSegmentDataManagerTest { throws Exception { super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName, - semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics, + semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics)); _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state"); _state.setAccessible(true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java new file mode 100644 index 0000000..78ee12c --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java @@ -0,0 +1,48 @@ +package org.apache.pinot.core.realtime.impl.fakestream; + +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; + + +public class FakePartitionGroupMetadata implements PartitionGroupMetadata { + + private final int _groupId; + public FakePartitionGroupMetadata(int groupId) { + _groupId = groupId; + } + + @Override + public int getGroupId() { + return getGroupId(); + } + + @Override + public Checkpoint getStartCheckpoint() { + return null; + } + + @Override + public Checkpoint getEndCheckpoint() { + return null; + } + + @Override + public void setStartCheckpoint(Checkpoint startCheckpoint) { + + } + + @Override + public void setEndCheckpoint(Checkpoint endCheckpoint) { + + } + + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public PartitionGroupMetadata deserialize(byte[] blob) { + return null; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java index 9669223..289b226 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.realtime.impl.fakestream; -import java.util.List; import java.util.Set; import org.apache.pinot.core.util.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; @@ -69,14 +68,9 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { return new FakeStreamMetadataProvider(_streamConfig); } - @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { - return null; - } @Override - public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { return null; } @@ -93,7 +87,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory { // stream metadata provider StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId); - int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000); + int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size(); System.out.println(partitionCount); // Partition metadata provider diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java index e0b8ebd..c96d06a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java @@ -19,9 +19,12 @@ package org.apache.pinot.core.realtime.impl.fakestream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -31,7 +34,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; * StreamMetadataProvider implementation for the fake stream */ public class FakeStreamMetadataProvider implements StreamMetadataProvider { - private int _numPartitions; + private final int _numPartitions; public FakeStreamMetadataProvider(StreamConfig streamConfig) { _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig); @@ -42,6 +45,16 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider { return _numPartitions; } + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { + List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); + for (int i = 0; i < _numPartitions; i++) { + partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i)); + } + return partitionGroupMetadataList; + } + public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException { throw new UnsupportedOperationException("This method is deprecated"); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java index 808a464..d917d73 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.integration.tests; import java.lang.reflect.Constructor; -import java.util.List; import java.util.Random; import java.util.Set; import org.apache.pinot.spi.data.readers.GenericRow; @@ -122,13 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster } @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { - return null; - } - - @Override - public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { return null; } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java index cd4f9b3..0196bde 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java @@ -165,7 +165,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust assertNotNull(columnPartitionMetadata); assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); assertEquals(columnPartitionMetadata.getNumPartitions(), 2); - int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId(); + int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId(); assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(streamPartitionId)); numSegmentsForPartition[streamPartitionId]++; } @@ -236,7 +236,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust assertNotNull(columnPartitionMetadata); assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); assertEquals(columnPartitionMetadata.getNumPartitions(), 2); - int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId(); + int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId(); numSegmentsForPartition[streamPartitionId]++; if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { @@ -313,7 +313,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust assertNotNull(columnPartitionMetadata); assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur")); assertEquals(columnPartitionMetadata.getNumPartitions(), 2); - int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId(); + int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId(); numSegmentsForPartition[streamPartitionId]++; if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java index b8ed19d..82c282c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.stream.kafka09; -import java.util.List; import java.util.Set; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupMetadata; @@ -55,13 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { } @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { - return null; - } - - @Override - public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { return null; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java new file mode 100644 index 0000000..1d792ac --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java @@ -0,0 +1,48 @@ +package org.apache.pinot.plugin.stream.kafka09; + +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; + + +public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata { + + private final int _groupId; + public KafkaPartitionGroupMetadata(int partitionId) { + _groupId = partitionId; + } + + @Override + public int getGroupId() { + return _groupId; + } + + @Override + public Checkpoint getStartCheckpoint() { + return null; + } + + @Override + public Checkpoint getEndCheckpoint() { + return null; + } + + @Override + public void setStartCheckpoint(Checkpoint startCheckpoint) { + + } + + @Override + public void setEndCheckpoint(Checkpoint endCheckpoint) { + + } + + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public PartitionGroupMetadata deserialize(byte[] blob) { + return null; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java index 06ee697..865ae96 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java @@ -22,9 +22,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetRequest; @@ -36,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -84,7 +89,12 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen * @return */ @Override + @Deprecated public synchronized int fetchPartitionCount(long timeoutMillis) { + return fetchPartitionCountInternal(timeoutMillis); + } + + private int fetchPartitionCountInternal(long timeoutMillis) { int unknownTopicReplyCount = 0; final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10; int kafkaErrorCount = 0; @@ -145,6 +155,22 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen throw new TimeoutException(); } + /** + * Fetch the partition group metadata list + * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. + * Hence current partition groups are not needed to compute the new partition groups + */ + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { + int partitionCount = fetchPartitionCountInternal(timeoutMillis); + List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount); + for (int i = 0; i < partitionCount; i++) { + partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i)); + } + return partitionGroupMetadataList; + } + public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws java.util.concurrent.TimeoutException { throw new UnsupportedOperationException("The use of this method s not supported"); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java index beb82e5..fbdfdfb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java @@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory); - Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2); + Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2); } @Test diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java index 806baff..c73aacb 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.stream.kafka20; -import java.util.List; import java.util.Set; import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupMetadata; @@ -52,13 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory { } @Override - public List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata) { - return null; - } - - @Override - public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) { return null; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java new file mode 100644 index 0000000..31ae75a --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java @@ -0,0 +1,48 @@ +package org.apache.pinot.plugin.stream.kafka20; + +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; + + +public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata { + + private final int _groupId; + public KafkaPartitionGroupMetadata(int partitionId) { + _groupId = partitionId; + } + + @Override + public int getGroupId() { + return _groupId; + } + + @Override + public Checkpoint getStartCheckpoint() { + return null; + } + + @Override + public Checkpoint getEndCheckpoint() { + return null; + } + + @Override + public void setStartCheckpoint(Checkpoint startCheckpoint) { + + } + + @Override + public void setEndCheckpoint(Checkpoint endCheckpoint) { + + } + + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public PartitionGroupMetadata deserialize(byte[] blob) { + return null; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index c0e2041..187c61b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -21,11 +21,15 @@ package org.apache.pinot.plugin.stream.kafka20; import com.google.common.base.Preconditions; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; @@ -42,10 +46,27 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa } @Override + @Deprecated public int fetchPartitionCount(long timeoutMillis) { return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); } + /** + * Fetch the partitionGroupMetadata list. + * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. + * Hence current partition groups are not needed to compute the new partition groups + */ + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList( + @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) { + int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); + List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount); + for (int i = 0; i < partitionCount; i++) { + partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i)); + } + return partitionGroupMetadataList; + } + public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws java.util.concurrent.TimeoutException { throw new UnsupportedOperationException("The use of this method is not supported"); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java similarity index 74% rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index d523235..e1ce1a6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.spi.stream; +import java.util.List; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,24 +27,27 @@ import org.slf4j.LoggerFactory; /** * Fetches the partition count of a stream using the {@link StreamMetadataProvider} */ -public class PartitionCountFetcher implements Callable<Boolean> { +public class PartitionGroupMetadataFetcher implements Callable<Boolean> { - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionCountFetcher.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); private int _partitionCount = -1; + private List<PartitionGroupMetadata> _partitionGroupMetadataList; + private List<PartitionGroupMetadata> _currentPartitionGroupMetadata; private final StreamConfig _streamConfig; private StreamConsumerFactory _streamConsumerFactory; private Exception _exception; private final String _topicName; - public PartitionCountFetcher(StreamConfig streamConfig) { + public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { _streamConfig = streamConfig; _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig); _topicName = streamConfig.getTopicName(); + _currentPartitionGroupMetadata = currentPartitionGroupMetadataList; } - public int getPartitionCount() { - return _partitionCount; + public List<PartitionGroupMetadata> getPartitionGroupMetadataList() { + return _partitionGroupMetadataList; } public Exception getException() { @@ -59,10 +63,10 @@ public class PartitionCountFetcher implements Callable<Boolean> { public Boolean call() throws Exception { - String clientId = PartitionCountFetcher.class.getSimpleName() + "-" + _topicName; + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName; try ( StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { - _partitionCount = streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L); + _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L); if (_exception != null) { // We had at least one failure, but succeeded now. Log an info LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java index 1d50160..b92f04d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java @@ -33,16 +33,16 @@ public class PartitionOffsetFetcher implements Callable<Boolean> { private final String _topicName; private final OffsetCriteria _offsetCriteria; - private final int _partitionId; + private final int _partitionGroupId; private Exception _exception = null; private StreamPartitionMsgOffset _offset; private StreamConsumerFactory _streamConsumerFactory; StreamConfig _streamConfig; - public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionId, StreamConfig streamConfig) { + public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) { _offsetCriteria = offsetCriteria; - _partitionId = partitionId; + _partitionGroupId = partitionGroupId; _streamConfig = streamConfig; _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); _topicName = streamConfig.getTopicName(); @@ -64,18 +64,19 @@ public class PartitionOffsetFetcher implements Callable<Boolean> { @Override public Boolean call() throws Exception { - String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionId; + String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId; try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory - .createPartitionMetadataProvider(clientId, _partitionId)) { + .createPartitionMetadataProvider(clientId, _partitionGroupId)) { _offset = streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS); if (_exception != null) { LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName, - _partitionId); + _partitionGroupId); } return Boolean.TRUE; } catch (TransientConsumerException e) { - LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName, _partitionId, + LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName, + _partitionGroupId, e.getMessage()); _exception = e; return Boolean.FALSE; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index 4db0fb1..9caf61b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.spi.stream; -import java.util.List; import java.util.Set; @@ -42,6 +41,7 @@ public abstract class StreamConsumerFactory { * @param partition the partition id of the partition for which this consumer is being created * @return */ + @Deprecated public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition); /** @@ -74,10 +74,6 @@ public abstract class StreamConsumerFactory { return new LongMsgOffsetFactory(); } - // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state - public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList( - List<PartitionGroupMetadata> currentPartitionGroupsMetadata); - // creates a consumer which consumes from a partition group - public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata); + public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 557ffc4..5b9104e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; +import java.util.List; import javax.annotation.Nonnull; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -32,11 +33,15 @@ import org.apache.pinot.spi.annotations.InterfaceStability; public interface StreamMetadataProvider extends Closeable { /** * Fetches the number of partitions for a topic given the stream configs - * @param timeoutMillis - * @return + * @deprecated use getPartitionGroupMetadataList instead */ + @Deprecated int fetchPartitionCount(long timeoutMillis); + // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state + List<PartitionGroupMetadata> getPartitionGroupMetadataList( + List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis); + // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0 @Deprecated long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org