This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new f00c8dc End-of-shard as end criteria AND consume shards in order f00c8dc is described below commit f00c8dca81cf6adc7863b05a47a0801e5a564442 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Fri Jan 15 17:38:28 2021 -0800 End-of-shard as end criteria AND consume shards in order --- .../segment/LLCRealtimeSegmentZKMetadata.java | 5 - .../protocols/SegmentCompletionProtocol.java | 2 + .../realtime/PinotLLCRealtimeSegmentManager.java | 31 +++--- .../RealtimeSegmentValidationManager.java | 2 +- .../PinotLLCRealtimeSegmentManagerTest.java | 2 +- .../realtime/LLRealtimeSegmentDataManager.java | 37 ++++--- .../kinesis/KinesisStreamMetadataProvider.java | 106 ++++++++++++--------- .../pinot/spi/stream/PartitionGroupInfo.java | 6 +- .../pinot/spi/stream/PartitionGroupMetadata.java | 16 ++-- .../pinot/spi/stream/StreamMetadataProvider.java | 2 +- 10 files changed, 120 insertions(+), 89 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java index b8b8d95..7cb19a7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java @@ -87,11 +87,6 @@ public class LLCRealtimeSegmentZKMetadata extends RealtimeSegmentZKMetadata { public ZNRecord toZNRecord() { ZNRecord znRecord = super.toZNRecord(); znRecord.setSimpleField(START_OFFSET, _startOffset); - if (_endOffset == null) { - // TODO Issue 5359 Keep this until all components have upgraded to a version that can handle _offset being null - // For backward compatibility until all components have been upgraded to deal with null value for _endOffset - _endOffset = Long.toString(Long.MAX_VALUE); - } znRecord.setSimpleField(END_OFFSET, _endOffset); znRecord.setIntField(NUM_REPLICAS, _numReplicas); znRecord.setSimpleField(DOWNLOAD_URL, _downloadUrl); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index dd1330d..6dcbda2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -138,6 +138,8 @@ public class SegmentCompletionProtocol { public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached + public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; + // Stop reason sent by server as end of partitionGroup reached // Canned responses public static final Response RESP_NOT_LEADER = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 8c17ff4..3a3dcf6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -78,6 +78,7 @@ import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; @@ -166,7 +167,8 @@ public class PinotLLCRealtimeSegmentManager { * Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata} * for latest segment of each partition group */ - public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) { + public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState, + StreamConfig streamConfig) { List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); // From all segment names in the ideal state, find unique partition group ids and their latest segment @@ -185,6 +187,8 @@ public class PinotLLCRealtimeSegmentManager { } // Create a PartitionGroupMetadata for each latest segment + PartitionGroupCheckpointFactory checkpointFactory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); @@ -195,7 +199,9 @@ public class PinotLLCRealtimeSegmentManager { (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata; PartitionGroupMetadata partitionGroupMetadata = new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(), - llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(), + checkpointFactory.create(llRealtimeSegmentZKMetadata.getStartOffset()), + llRealtimeSegmentZKMetadata.getEndOffset() == null ? null + : checkpointFactory.create(llRealtimeSegmentZKMetadata.getEndOffset()), llRealtimeSegmentZKMetadata.getStatus().toString()); partitionGroupMetadataList.add(partitionGroupMetadata); } @@ -498,9 +504,10 @@ public class PinotLLCRealtimeSegmentManager { // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing. // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS] - List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState); PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = + getCurrentPartitionGroupMetadataList(idealState, streamConfig); // Find new partition groups [A],[B],[C],[D] (assume A split into C D) // If segment has consumed all of A, we will receive B,C,D @@ -610,9 +617,7 @@ public class PinotLLCRealtimeSegmentManager { int numPartitions, int numReplicas) { String realtimeTableName = tableConfig.getTableName(); String segmentName = newLLCSegmentName.getSegmentName(); - StreamPartitionMsgOffsetFactory offsetFactory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); - StreamPartitionMsgOffset startOffset = offsetFactory.create(committingSegmentDescriptor.getNextOffset()); + String startOffset = committingSegmentDescriptor.getNextOffset(); LOGGER .info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", segmentName, startOffset, creationTimeMs); @@ -621,7 +626,7 @@ public class PinotLLCRealtimeSegmentManager { newSegmentZKMetadata.setTableName(realtimeTableName); newSegmentZKMetadata.setSegmentName(segmentName); newSegmentZKMetadata.setCreationTime(creationTimeMs); - newSegmentZKMetadata.setStartOffset(startOffset.toString()); + newSegmentZKMetadata.setStartOffset(startOffset); // Leave maxOffset as null. newSegmentZKMetadata.setNumReplicas(numReplicas); newSegmentZKMetadata.setStatus(Status.IN_PROGRESS); @@ -808,7 +813,7 @@ public class PinotLLCRealtimeSegmentManager { assert idealState != null; if (idealState.isEnabled()) { List<PartitionGroupMetadata> currentPartitionGroupMetadataList = - getCurrentPartitionGroupMetadataList(idealState); + getCurrentPartitionGroupMetadataList(idealState, streamConfig); List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList); @@ -1102,7 +1107,7 @@ public class PinotLLCRealtimeSegmentManager { return idealState; } - private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) { + private Checkpoint getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) { Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap()); streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), @@ -1111,12 +1116,10 @@ public class PinotLLCRealtimeSegmentManager { new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria); List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo = getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList()); - StreamPartitionMsgOffset partitionStartOffset = null; + Checkpoint partitionStartOffset = null; for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) { if (info.getPartitionGroupId() == partitionGroupId) { - StreamPartitionMsgOffsetFactory factory = - StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); - partitionStartOffset = factory.create(info.getStartCheckpoint()); + partitionStartOffset = info.getStartCheckpoint(); break; } } @@ -1136,7 +1139,7 @@ public class PinotLLCRealtimeSegmentManager { long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) { String realtimeTableName = tableConfig.getTableName(); int partitionGroupId = partitionGroupInfo.getPartitionGroupId(); - String startCheckpoint = partitionGroupInfo.getStartCheckpoint(); + String startCheckpoint = partitionGroupInfo.getStartCheckpoint().toString(); LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 96604dd..d611433 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) { super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(), - 6000, pinotHelixResourceManager, + config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _validationMetrics = validationMetrics; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index c19a845..ecbf2ef 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -920,7 +920,7 @@ public class PinotLLCRealtimeSegmentManagerTest { List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, - PARTITION_OFFSET.toString())) + PARTITION_OFFSET)) .collect(Collectors.toList()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 4c1d8f4..63eafc1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -160,7 +160,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) { _segmentTarFile = segmentTarFile; _metadataFileMap = metadataFileMap; - _offset = _streamPartitionMsgOffsetFactory.create(offset); + _offset = _checkpointFactory.create(offset); _buildTimeMillis = buildTimeMillis; _waitTimeMillis = waitTimeMillis; _segmentSizeBytes = segmentSizeBytes; @@ -235,11 +235,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final SegmentVersion _segmentVersion; private final SegmentBuildTimeLeaseExtender _leaseExtender; private SegmentBuildDescriptor _segmentBuildDescriptor; - private StreamConsumerFactory _streamConsumerFactory; - private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory; + private final StreamConsumerFactory _streamConsumerFactory; + private final PartitionGroupCheckpointFactory _checkpointFactory; // Segment end criteria private volatile long _consumeEndTime = 0; + private boolean _endOfPartitionGroup = false; private Checkpoint _finalOffset; // Used when we want to catch up to this one private volatile boolean _shouldStop = false; @@ -263,7 +264,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { private final List<String> _noDictionaryColumns; private final List<String> _varLengthDictionaryColumns; private final String _sortedColumn; - private Logger segmentLogger; + private final Logger segmentLogger; private final String _tableStreamName; private final PinotDataBufferMemoryManager _memoryManager; private AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0); @@ -306,6 +307,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT; return true; + } else if (_endOfPartitionGroup) { + segmentLogger.info( + "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}", + _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); + _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP; + // fixme: Handle creating a segment with 0 rows. + // Happens if endOfPartitionGroup reached but no rows were consumed + return true; } return false; @@ -369,7 +378,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { final long idlePipeSleepTimeMillis = 100; final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig .getFetchTimeoutMillis()); // 3 minute count - Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory + Checkpoint lastUpdatedOffset = _checkpointFactory .create(_currentOffset); // so that we always update the metric when we enter this method. long consecutiveIdleCount = 0; // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid @@ -384,6 +393,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try { messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); + _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); consecutiveErrorCount = 0; } catch (TimeoutException e) { handleTransientStreamErrors(e); @@ -410,7 +420,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset.getOffset()); // _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset.getOffset()); _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); - lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); + lastUpdatedOffset = _checkpointFactory.create(_currentOffset); } else { // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time. // Create a new stream consumer wrapper, in case we are stuck on something. @@ -668,10 +678,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { @VisibleForTesting protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) { if (response.getStreamPartitionMsgOffset() != null) { - return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset()); + return _checkpointFactory.create(response.getStreamPartitionMsgOffset()); } else { // TODO Issue 5359 Remove this once the protocol is upgraded on server and controller - return _streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset())); + return _checkpointFactory.create(Long.toString(response.getOffset())); } } @@ -966,7 +976,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Remove the segment file before we do anything else. removeSegmentFile(); _leaseExtender.removeSegment(_segmentNameStr); - final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset()); + final Checkpoint endOffset = _checkpointFactory.create(llcMetadata.getEndOffset()); segmentLogger .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(), _startOffset, endOffset); @@ -1126,14 +1136,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _partitionLevelStreamConfig = new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig)); _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig); - _streamPartitionMsgOffsetFactory = + _checkpointFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory(); _streamTopic = _partitionLevelStreamConfig.getTopicName(); _segmentNameStr = _segmentZKMetadata.getSegmentName(); _llcSegmentName = llcSegmentName; _partitionGroupId = _llcSegmentName.getPartitionGroupId(); _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(), - _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(), + _checkpointFactory.create(_segmentZKMetadata.getStartOffset()), + _segmentZKMetadata.getEndOffset() == null ? null : _checkpointFactory.create(_segmentZKMetadata.getEndOffset()), _segmentZKMetadata.getStatus().toString()); _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore; _acquiredConsumerSemaphore = new AtomicBoolean(false); @@ -1273,8 +1284,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics); - _startOffset = _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()); - _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset); + _startOffset = _checkpointFactory.create(_segmentZKMetadata.getStartOffset()); + _currentOffset = _checkpointFactory.create(_startOffset); _resourceTmpDir = new File(resourceDataDir, "_tmp"); if (!_resourceTmpDir.exists()) { _resourceTmpDir.mkdirs(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index b22bbe4..42150a3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -21,12 +21,15 @@ package org.apache.pinot.plugin.stream.kinesis; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumer; @@ -69,7 +72,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { /** * This call returns all active shards, taking into account the consumption status for those shards. * PartitionGroupInfo is returned for a shard if: - * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list + * 1. It is a branch new shard AND its parent has been consumed completely * 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard */ @Override @@ -77,54 +80,57 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) throws IOException, TimeoutException { - Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream() - .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); - List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>(); - List<Shard> shards = _kinesisConnectionHandler.getShards(); - for (Shard shard : shards) { - KinesisCheckpoint newStartCheckpoint; - - String shardId = shard.shardId(); - int partitionGroupId = getPartitionGroupIdFromShardId(shardId); - PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId); - - if (currentPartitionGroupMetadata != null) { // existing shard - KinesisCheckpoint currentEndCheckpoint = null; - try { - currentEndCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint()); - } catch (Exception e) { - // ignore. No end checkpoint yet for IN_PROGRESS segment - } - if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment - String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - if (endingSequenceNumber != null) { // shard has ended - // check if segment has consumed all the messages already - PartitionGroupConsumer partitionGroupConsumer = - _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata); - - MessageBatch messageBatch; - try { - messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs); - } finally { - partitionGroupConsumer.close(); - } - if (messageBatch.isEndOfPartitionGroup()) { - // shard has ended. Skip it from results - continue; - } + + Map<String, Shard> shardIdToShardMap = + _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s)); + Set<String> shardsInCurrent = new HashSet<>(); + Set<String> shardsEnded = new HashSet<>(); + + // TODO: Once we start supporting multiple shards in a PartitionGroup, + // we need to iterate over all shards to check if any of them have reached end + + // Process existing shards. Add them to new list if still consuming from them + for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { + KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) currentPartitionGroupMetadata.getStartCheckpoint(); + String shardId = kinesisStartCheckpoint.getShardToStartSequenceMap().keySet().iterator().next(); + Shard shard = shardIdToShardMap.get(shardId); + shardsInCurrent.add(shardId); + + Checkpoint newStartCheckpoint; + Checkpoint currentEndCheckpoint = currentPartitionGroupMetadata.getEndCheckpoint(); + if (currentEndCheckpoint != null) { // Segment DONE (committing/committed) + String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); + if (endingSequenceNumber != null) { // Shard has ended, check if we're also done consuming it + if (consumedEndOfShard(currentEndCheckpoint, currentPartitionGroupMetadata)) { + shardsEnded.add(shardId); + continue; // Shard ended and we're done consuming it. Skip } - newStartCheckpoint = currentEndCheckpoint; - } else { - newStartCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint()); } - } else { // new shard + newStartCheckpoint = currentEndCheckpoint; + } else { // Segment IN_PROGRESS + newStartCheckpoint = currentPartitionGroupMetadata.getStartCheckpoint(); + } + newPartitionGroupInfos.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), newStartCheckpoint)); + } + + // Add new shards. Parent should be null (new table case, very first shards) OR we should be flagged as reached EOL and completely consumed. + for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) { + String newShardId = entry.getKey(); + if (shardsInCurrent.contains(newShardId)) { + continue; + } + Checkpoint newStartCheckpoint; + Shard newShard = entry.getValue(); + String parentShardId = newShard.parentShardId(); + + if (parentShardId == null || shardsEnded.contains(parentShardId)) { Map<String, String> shardToSequenceNumberMap = new HashMap<>(); - shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber()); + shardToSequenceNumberMap.put(newShardId, newShard.sequenceNumberRange().startingSequenceNumber()); newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); + int partitionGroupId = getPartitionGroupIdFromShardId(newShardId); + newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint)); } - - newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize())); } return newPartitionGroupInfos; } @@ -138,6 +144,20 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum); } + private boolean consumedEndOfShard(Checkpoint startCheckpoint, PartitionGroupMetadata partitionGroupMetadata) + throws IOException, TimeoutException { + PartitionGroupConsumer partitionGroupConsumer = + _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, partitionGroupMetadata); + + MessageBatch messageBatch; + try { + messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, null, _fetchTimeoutMs); + } finally { + partitionGroupConsumer.close(); + } + return messageBatch.isEndOfPartitionGroup(); + } + @Override public void close() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java index 758953d..b06e878 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java @@ -27,9 +27,9 @@ package org.apache.pinot.spi.stream; public class PartitionGroupInfo { private final int _partitionGroupId; - private final String _startCheckpoint; + private final Checkpoint _startCheckpoint; - public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) { + public PartitionGroupInfo(int partitionGroupId, Checkpoint startCheckpoint) { _partitionGroupId = partitionGroupId; _startCheckpoint = startCheckpoint; } @@ -38,7 +38,7 @@ public class PartitionGroupInfo { return _partitionGroupId; } - public String getStartCheckpoint() { + public Checkpoint getStartCheckpoint() { return _startCheckpoint; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index a99a82b..1ac12fb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -22,12 +22,12 @@ public class PartitionGroupMetadata { private final int _partitionGroupId; private int _sequenceNumber; - private String _startCheckpoint; - private String _endCheckpoint; + private Checkpoint _startCheckpoint; + private Checkpoint _endCheckpoint; private String _status; - public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint, - String endCheckpoint, String status) { + public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, Checkpoint startCheckpoint, + Checkpoint endCheckpoint, String status) { _partitionGroupId = partitionGroupId; _sequenceNumber = sequenceNumber; _startCheckpoint = startCheckpoint; @@ -47,19 +47,19 @@ public class PartitionGroupMetadata { _sequenceNumber = sequenceNumber; } - public String getStartCheckpoint() { + public Checkpoint getStartCheckpoint() { return _startCheckpoint; } - public void setStartCheckpoint(String startCheckpoint) { + public void setStartCheckpoint(Checkpoint startCheckpoint) { _startCheckpoint = startCheckpoint; } - public String getEndCheckpoint() { + public Checkpoint getEndCheckpoint() { return _endCheckpoint; } - public void setEndCheckpoint(String endCheckpoint) { + public void setEndCheckpoint(Checkpoint endCheckpoint) { _endCheckpoint = endCheckpoint; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index cecc708..4b2751c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -82,7 +82,7 @@ public interface StreamMetadataProvider extends Closeable { streamConsumerFactory.createPartitionMetadataProvider(clientId, i); StreamPartitionMsgOffset streamPartitionMsgOffset = partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis); - newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString())); + newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset)); } return newPartitionGroupInfoList; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org