This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 214c007c2915c8aa149e1e06689e66abaa85b083 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Thu Jan 7 16:07:16 2021 -0800 Dont create new CONSUMING segment if shard has reached end of life --- .../protocols/SegmentCompletionProtocol.java | 1 + .../realtime/PinotLLCRealtimeSegmentManager.java | 101 ++++++++++----------- .../RealtimeSegmentValidationManager.java | 2 +- .../PinotLLCRealtimeSegmentManagerTest.java | 3 +- .../realtime/LLRealtimeSegmentDataManager.java | 13 ++- .../plugin/stream/kinesis/KinesisConsumer.java | 8 +- .../stream/kinesis/KinesisConsumerFactory.java | 2 +- .../plugin/stream/kinesis/KinesisRecordsBatch.java | 9 +- .../kinesis/KinesisStreamMetadataProvider.java | 48 +++++++--- .../org/apache/pinot/spi/stream/MessageBatch.java | 7 ++ 10 files changed, 115 insertions(+), 79 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index dd1330d..74614df 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -138,6 +138,7 @@ public class SegmentCompletionProtocol { public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached + public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup reached // Canned responses public static final Response RESP_NOT_LEADER = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index bbd1ef3..9fa6850 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -477,7 +477,6 @@ public class PinotLLCRealtimeSegmentManager { Preconditions .checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); - int numPartitions = getNumPartitionsFromIdealState(idealState); int numReplicas = getNumReplicas(tableConfig, instancePartitions); /* @@ -496,18 +495,21 @@ public class PinotLLCRealtimeSegmentManager { // Step-2 - // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up + // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing. - // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS] + // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS] List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); - // find new partition groups [A],[B],[C],[D] + // find new partition groups [A],[B],[C],[D] (assume A split into C D) + // If segment has consumed all of A, we will receive B,C,D + // If segment is still not reached last msg of A, we will receive A,B,C,D List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + int numPartitions = newPartitionGroupInfoList.size(); - // create new segment metadata, only if it is not IN_PROGRESS in the current state + // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect( Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); @@ -519,36 +521,25 @@ public class PinotLLCRealtimeSegmentManager { PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId); if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found. // make new segment - // FIXME: flushThreshold of segment is actually (configured threshold/numPartitions) - // In Kinesis, with every split/merge, we get new partitions, and an old partition gets deactivated. - // However, the getPartitionGroupInfo call returns ALL shards, regardless of whether they're active or not. - // So our numPartitions will forever keep increasing. - // TODO: can the getPartitionGroupInfo return the active partitions only, based on the checkpoints passed in current? + // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments String newLLCSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs, instancePartitions, numPartitions, numReplicas); newConsumingSegmentNames.add(newLLCSegmentName); } else { - String currentStatus = currentPartitionGroupMetadata.getStatus(); - if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { - // not IN_PROGRESS anymore in current state. Should be DONE. - // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment - // todo: skip this if the partition doesn't match with the committing segment? + LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName); + // Update this only for committing segment. All other partitions should get updated by their own commit call + if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) { + Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString())); LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId, currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName()); - - // FIXME: a new CONSUMING segment is created even if EOL for this shard has been reached. - // the logic in getPartitionGroupInfo to prevent returning of EOLed shards isn't working - // OPTION: Since consumer knows about it, it can pass param in request/committingSegmentDescriptor "isEndOfShard" - // We can set that in metadata for validation manager to skip these partitions } } } - // Step-3 SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = @@ -840,8 +831,9 @@ public class PinotLLCRealtimeSegmentManager { if (idealState.isEnabled()) { List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size(); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions); + List<PartitionGroupInfo> newPartitionGroupInfoList = + getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList); } else { LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName); return idealState; @@ -988,11 +980,14 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, - IdealState idealState, int numPartitions) { + IdealState idealState, List<PartitionGroupInfo> newPartitionGroupInfoList) { String realtimeTableName = tableConfig.getTableName(); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); int numReplicas = getNumReplicas(tableConfig, instancePartitions); + int numPartitions = newPartitionGroupInfoList.size(); + Set<Integer> newPartitionGroupSet = + newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet()); SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = @@ -1029,7 +1024,7 @@ public class PinotLLCRealtimeSegmentManager { Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { // Latest segment of metadata is in idealstate. - if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { if (latestSegmentZKMetadata.getStatus() == Status.DONE) { // step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE) @@ -1040,15 +1035,23 @@ public class PinotLLCRealtimeSegmentManager { } LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState", latestSegmentName); - - LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); - String newSegmentName = newLLCSegmentName.getSegmentName(); - CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, - (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); - createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, - committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, - segmentAssignment, instancePartitionsMap); + if (newPartitionGroupSet.contains(partitionGroupId)) { + LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); + String newSegmentName = newLLCSegmentName.getSegmentName(); + CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName, + (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); + createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, + committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, + segmentAssignment, instancePartitionsMap); + } else { // partition group reached end of life + LOGGER.info( + "PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. " + + "Skipping creation of new ZK metadata and new segment in ideal state", + partitionGroupId, latestSegmentName); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment, + instancePartitionsMap); + } } // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment. } else { // no replica in CONSUMING state @@ -1081,11 +1084,14 @@ public class PinotLLCRealtimeSegmentManager { updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); } else { - // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas - // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In - // that case, we need to either extend this part to handle the state, or prevent segments from getting into - // such state. - LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); + if (!newPartitionGroupSet.contains(partitionGroupId)) { + // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas + // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In + // that case, we need to either extend this part to handle the state, or prevent segments from getting into + // such state. + LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); + } + // else, the partition group has reached end of life. This is an acceptable state } } } else { @@ -1127,10 +1133,7 @@ public class PinotLLCRealtimeSegmentManager { } // Set up new partitions if not exist - List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); - List<PartitionGroupInfo> partitionGroupInfoList = - getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); - for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) { + for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) { int partitionGroupId = partitionGroupInfo.getPartitionGroupId(); if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) { String newSegmentName = @@ -1178,18 +1181,6 @@ public class PinotLLCRealtimeSegmentManager { return System.currentTimeMillis(); } - // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol) - // or return all unique partitions found in ideal state right from the birth of the table - private int getNumPartitionsFromIdealState(IdealState idealState) { - Set<String> uniquePartitions = new HashSet<>(); - for (String segmentName : idealState.getRecord().getMapFields().keySet()) { - if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { - uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId())); - } - } - return uniquePartitions.size(); - } - private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) { if (instancePartitions.getNumReplicaGroups() == 1) { // Non-replica-group based diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index d611433..96604dd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), - config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, + 6000, pinotHelixResourceManager, leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 75c8057..0f33556 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -850,7 +850,8 @@ public class PinotLLCRealtimeSegmentManagerTest { } public void ensureAllPartitionsConsuming() { - ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, _numPartitions); + ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, + getPartitionGroupInfoList(_streamConfig, Collections.emptyList())); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 758c656..c889193 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -240,6 +240,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Segment end criteria private volatile long _consumeEndTime = 0; private Checkpoint _finalOffset; // Used when we want to catch up to this one + private boolean _endOfPartitionGroup = false; private volatile boolean _shouldStop = false; // It takes 30s to locate controller leader, and more if there are multiple controller failures. @@ -306,6 +307,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT; return true; + } else if (_endOfPartitionGroup) { + segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}", + _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); + _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP; + // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0 + // If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard + return true; } return false; @@ -384,6 +392,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try { messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); consecutiveErrorCount = 0; } catch (TransientConsumerException e) { handleTransientStreamErrors(e); @@ -1245,9 +1254,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // long as the partition function is not changed. int numPartitions = columnPartitionConfig.getNumPartitions(); try { - // fixme: get this from ideal state - int numStreamPartitions = _streamMetadataProvider - .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size(); + int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L); if (numStreamPartitions != numPartitions) { segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 70d2c8a..5cbd7e6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -125,8 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti if (nextStartSequenceNumber == null && recordList.size() > 0) { nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - - return new KinesisRecordsBatch(recordList, next.getKey()); + return new KinesisRecordsBatch(recordList, next.getKey(), isEndOfShard); } catch (IllegalStateException e) { LOG.warn("Illegal state exception, connection is broken", e); return handleException(kinesisStartCheckpoint, recordList); @@ -158,10 +157,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap()); newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber); - return new KinesisRecordsBatch(recordList, shardId); + return new KinesisRecordsBatch(recordList, shardId, false); } else { - return new KinesisRecordsBatch(recordList, shardId); - + return new KinesisRecordsBatch(recordList, shardId, false); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index 631f240..fc9c4af 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -48,7 +48,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory { @Override public StreamMetadataProvider createStreamMetadataProvider(String clientId) { - return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig)); + return new KinesisStreamMetadataProvider(clientId, _streamConfig); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index fdc883b..b3eb626 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -32,10 +32,12 @@ import software.amazon.awssdk.services.kinesis.model.Record; public class KinesisRecordsBatch implements MessageBatch<byte[]> { private final List<Record> _recordList; private final String _shardId; + private final boolean _endOfShard; - public KinesisRecordsBatch(List<Record> recordList, String shardId) { + public KinesisRecordsBatch(List<Record> recordList, String shardId, boolean endOfShard) { _recordList = recordList; _shardId = shardId; + _endOfShard = endOfShard; } @Override @@ -68,4 +70,9 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { public long getNextStreamMessageOffsetAtIndex(int index) { throw new UnsupportedOperationException(); } + + @Override + public boolean isEndOfPartitionGroup() { + return _endOfShard; + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index 6c55a18..8968b56 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -1,27 +1,45 @@ package org.apache.pinot.plugin.stream.kinesis; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamMetadataProvider; import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisStreamMetadataProvider implements StreamMetadataProvider { private final KinesisConnectionHandler _kinesisConnectionHandler; + private final StreamConsumerFactory _kinesisStreamConsumerFactory; + private final String _clientId; + private final int _fetchTimeoutMs; - public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) { + public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig) { + KinesisConfig kinesisConfig = new KinesisConfig(streamConfig); _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); + _kinesisStreamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + _clientId = clientId; + _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis(); } @Override @@ -37,7 +55,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { @Override public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) - throws IOException { + throws IOException, TimeoutException { Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); @@ -45,10 +63,12 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>(); List<Shard> shards = _kinesisConnectionHandler.getShards(); for (Shard shard : shards) { // go over all shards + KinesisCheckpoint newStartCheckpoint; + String shardId = shard.shardId(); int partitionGroupId = getPartitionGroupIdFromShardId(shardId); PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId); - KinesisCheckpoint newStartCheckpoint; + if (currentPartitionGroupMetadata != null) { // existing shard KinesisCheckpoint currentEndCheckpoint = null; try { @@ -59,15 +79,18 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); if (endingSequenceNumber != null) { // shard has ended - // FIXME: this logic is not working - // was expecting sequenceNumOfLastMsgInShard == endSequenceNumOfShard. - // But it is much lesser than the endSeqNumOfShard - Map<String, String> shardToSequenceNumberMap = new HashMap<>(); - shardToSequenceNumberMap.put(shardId, endingSequenceNumber); - KinesisCheckpoint shardEndCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); - if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) { - // shard has ended AND we have reached the end checkpoint. - // skip this partition group in the result + // check if segment has consumed all the messages already + PartitionGroupConsumer partitionGroupConsumer = + _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata); + + MessageBatch messageBatch; + try { + messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs); + } finally { + partitionGroupConsumer.close(); + } + if (messageBatch.isEndOfPartitionGroup()) { + // shard has ended. Skip it from results continue; } } @@ -80,6 +103,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber()); newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); } + newPartitionGroupInfos .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize())); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java index 3052b9e..02c721f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java @@ -81,4 +81,11 @@ public interface MessageBatch<T> { default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) { return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index)); } + + /** + * Returns true if end of the consumer detects that no more records can be read from this partition group for good + */ + default boolean isEndOfPartitionGroup() { + return false; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org