This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d7d04081131363e582340dfd8dc11fc1a92f3e5a Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Jan 6 18:21:20 2021 -0800 Use shardId's last digits as partitionGroupId --- .../helix/core/PinotHelixResourceManager.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 10 +++ .../kinesis/KinesisStreamMetadataProvider.java | 79 +++++++++++++++++----- .../pinot/spi/stream/StreamMetadataProvider.java | 3 +- 4 files changed, 75 insertions(+), 21 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index b50da5f..d1c8755 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1374,8 +1374,8 @@ public class PinotHelixResourceManager { // (unless there are low-level segments already present) if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) { PinotTableIdealStateBuilder - .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState, - _enableBatchMessageMode); + .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, realtimeTableConfig, + idealState, _enableBatchMessageMode); LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName); } else { LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 61ef719..bbd1ef3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -519,6 +519,11 @@ public class PinotLLCRealtimeSegmentManager { PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found. // make new segment + // FIXME: flushThreshold of segment is actually (configured threshold/numPartitions) + // In Kinesis, with every split/merge, we get new partitions, and an old partition gets deactivated. + // However, the getPartitionGroupInfo call returns ALL shards, regardless of whether they're active or not. + // So our numPartitions will forever keep increasing. + // TODO: can the getPartitionGroupInfo return the active partitions only, based on the checkpoints passed in current? String newLLCSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs, instancePartitions, numPartitions, numReplicas); @@ -534,6 +539,11 @@ public class PinotLLCRealtimeSegmentManager { createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); + + // FIXME: a new CONSUMING segment is created even if EOL for this shard has been reached. + // the logic in getPartitionGroupInfo to prevent returning of EOLed shards isn't working + // OPTION: Since consumer knows about it, it can pass param in request/committingSegmentDescriptor "isEndOfShard" + // We can set that in metadata for validation manager to skip these partitions } } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index f86d06c..6c55a18 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -6,7 +6,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; @@ -16,44 +18,85 @@ import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisStreamMetadataProvider implements StreamMetadataProvider { - private final KinesisConfig _kinesisConfig; - private KinesisConnectionHandler _kinesisConnectionHandler; + private final KinesisConnectionHandler _kinesisConnectionHandler; public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) { - _kinesisConfig = kinesisConfig; _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); } @Override public int fetchPartitionCount(long timeoutMillis) { - return 0; + throw new UnsupportedOperationException(); } @Override - public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) - throws TimeoutException { - return 0; + public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) { + throw new UnsupportedOperationException(); } @Override public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) - throws TimeoutException { - List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>(); + throws IOException { + + Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = + currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); + + List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>(); List<Shard> shards = _kinesisConnectionHandler.getShards(); - for (Shard shard : shards) { - Map<String, String> shardToSequenceNumMap = new HashMap<>(); - shardToSequenceNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber()); - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceNumMap); - partitionGroupInfos - .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), kinesisCheckpoint.serialize())); + for (Shard shard : shards) { // go over all shards + String shardId = shard.shardId(); + int partitionGroupId = getPartitionGroupIdFromShardId(shardId); + PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId); + KinesisCheckpoint newStartCheckpoint; + if (currentPartitionGroupMetadata != null) { // existing shard + KinesisCheckpoint currentEndCheckpoint = null; + try { + currentEndCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint()); + } catch (Exception e) { + // ignore. No end checkpoint yet for IN_PROGRESS segment + } + if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment + String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); + if (endingSequenceNumber != null) { // shard has ended + // FIXME: this logic is not working + // was expecting sequenceNumOfLastMsgInShard == endSequenceNumOfShard. + // But it is much lesser than the endSeqNumOfShard + Map<String, String> shardToSequenceNumberMap = new HashMap<>(); + shardToSequenceNumberMap.put(shardId, endingSequenceNumber); + KinesisCheckpoint shardEndCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); + if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) { + // shard has ended AND we have reached the end checkpoint. + // skip this partition group in the result + continue; + } + } + newStartCheckpoint = currentEndCheckpoint; + } else { + newStartCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint()); + } + } else { // new shard + Map<String, String> shardToSequenceNumberMap = new HashMap<>(); + shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber()); + newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); + } + newPartitionGroupInfos + .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize())); } - return partitionGroupInfos; + return newPartitionGroupInfos; + } + + /** + * Converts a shardId string to a partitionGroupId integer by parsing the digits of the shardId + * e.g. "shardId-000000000001" becomes 1 + */ + private int getPartitionGroupIdFromShardId(String shardId) { + String shardIdNum = StringUtils.stripStart(StringUtils.removeStart(shardId, "shardId-"), "0"); + return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum); } @Override - public void close() - throws IOException { + public void close() { } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index c64f710..be2e819 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.stream; import java.io.Closeable; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -64,7 +65,7 @@ public interface StreamMetadataProvider extends Closeable { */ default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) - throws TimeoutException { + throws TimeoutException, IOException { int partitionCount = fetchPartitionCount(timeoutMillis); List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org