This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new bf81aac Cleanup, javadocs, comments bf81aac is described below commit bf81aac4f80b9197c28250c49c00d4c2314f8961 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Fri Jan 8 18:28:04 2021 -0800 Cleanup, javadocs, comments --- .../protocols/SegmentCompletionProtocol.java | 1 - .../helix/core/PinotTableIdealStateBuilder.java | 8 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 76 ++++++++++--------- .../PinotLLCRealtimeSegmentManagerTest.java | 21 ++---- .../realtime/LLRealtimeSegmentDataManager.java | 21 +++--- .../plugin/stream/kinesis/KinesisCheckpoint.java | 10 ++- .../pinot/plugin/stream/kinesis/KinesisConfig.java | 3 + .../stream/kinesis/KinesisConnectionHandler.java | 17 +++-- .../plugin/stream/kinesis/KinesisConsumer.java | 20 +++-- .../stream/kinesis/KinesisConsumerFactory.java | 8 +- .../stream/kinesis/KinesisMsgOffsetFactory.java | 4 + .../plugin/stream/kinesis/KinesisRecordsBatch.java | 6 +- .../kinesis/KinesisStreamMetadataProvider.java | 27 +++---- .../org/apache/pinot/spi/stream/Checkpoint.java | 5 ++ .../stream/PartitionGroupCheckpointFactory.java | 12 +-- .../pinot/spi/stream/PartitionGroupConsumer.java | 16 +++- .../pinot/spi/stream/PartitionGroupInfo.java | 13 ++-- .../spi/stream/PartitionGroupInfoFetcher.java | 2 +- .../pinot/spi/stream/PartitionGroupMetadata.java | 4 - .../pinot/spi/stream/PartitionLevelConsumer.java | 6 +- .../pinot/spi/stream/PartitionOffsetFetcher.java | 88 ---------------------- .../pinot/spi/stream/StreamConsumerFactory.java | 10 ++- .../pinot/spi/stream/StreamMetadataProvider.java | 9 +-- 23 files changed, 170 insertions(+), 217 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java index 74614df..dd1330d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java @@ -138,7 +138,6 @@ public class SegmentCompletionProtocol { public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached - public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup reached // Canned responses public static final Response RESP_NOT_LEADER = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 68bcf57..98fbd5d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -117,6 +117,12 @@ public class PinotTableIdealStateBuilder { pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState); } + /** + * Fetches the list of {@link PartitionGroupInfo} for the stream, with the help of the current partitionGroups metadata + * This call will only skip partitions which have reached end of life and all messages from that partition have been consumed. + * The current partition group metadata is used to determine the offsets that have been consumed for a partition. + * The current partition group metadata is also used to know about existing partition groupings which should not be disturbed + */ public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { PartitionGroupInfoFetcher partitionGroupInfoFetcher = @@ -126,7 +132,7 @@ public class PinotTableIdealStateBuilder { return partitionGroupInfoFetcher.getPartitionGroupInfoList(); } catch (Exception e) { Exception fetcherException = partitionGroupInfoFetcher.getException(); - LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException); + LOGGER.error("Could not get partition group info for {}", streamConfig.getTopicName(), fetcherException); throw new RuntimeException(fetcherException); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 5fd5c3f..8c17ff4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -24,7 +24,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -77,11 +76,11 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupInfo; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; -import org.apache.pinot.spi.stream.PartitionOffsetFetcher; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; @@ -164,17 +163,18 @@ public class PinotLLCRealtimeSegmentManager { /** - * Using the ideal state and segment metadata, return a list of the current partition groups + * Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata} + * for latest segment of each partition group */ public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) { List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(); - // from all segment names in the ideal state, find unique groups - Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>(); + // From all segment names in the ideal state, find unique partition group ids and their latest segment + Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegment = new HashMap<>(); for (String segment : idealState.getPartitionSet()) { LLCSegmentName llcSegmentName = new LLCSegmentName(segment); int partitionGroupId = llcSegmentName.getPartitionGroupId(); - groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> { + partitionGroupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> { if (latestSegment == null) { return llcSegmentName; } else { @@ -184,8 +184,8 @@ public class PinotLLCRealtimeSegmentManager { }); } - // create a PartitionGroupMetadata for each latest segment - for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) { + // Create a PartitionGroupMetadata for each latest segment + for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) { int partitionGroupId = entry.getKey(); LLCSegmentName llcSegmentName = entry.getValue(); RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider @@ -258,10 +258,8 @@ public class PinotLLCRealtimeSegmentManager { PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); - // get new partition groups and their metadata List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList()); int numPartitionGroups = newPartitionGroupInfoList.size(); - int numReplicas = getNumReplicas(tableConfig, instancePartitions); SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig); @@ -699,27 +697,16 @@ public class PinotLLCRealtimeSegmentManager { return commitTimeoutMS; } + /** + * Fetches the latest state of the PartitionGroups for the stream + * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, it will be skipped from the result + */ @VisibleForTesting List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); } - @VisibleForTesting - StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, - int partitionGroupId) { - PartitionOffsetFetcher partitionOffsetFetcher = - new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig); - try { - RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher); - return partitionOffsetFetcher.getOffset(); - } catch (Exception e) { - throw new IllegalStateException(String - .format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s", - streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e); - } - } - /** * An instance is reporting that it has stopped consuming a topic due to some error. * If the segment is in CONSUMING state, mark the state of the segment to be OFFLINE in idealstate. @@ -1033,26 +1020,26 @@ public class PinotLLCRealtimeSegmentManager { // Create a new segment to re-consume from the previous start offset LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs); - StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); + Checkpoint startCheckpoint = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); + Checkpoint partitionGroupStartCheckpoint = getPartitionGroupStartCheckpoint(streamConfig, partitionGroupId); + // Start offset must be higher than the start offset of the stream - StreamPartitionMsgOffset partitionStartOffset = - getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId); - if (partitionStartOffset.compareTo(startOffset) > 0) { - LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset, - partitionStartOffset, partitionGroupId, realtimeTableName); + if (partitionGroupStartCheckpoint.compareTo(startCheckpoint) > 0) { + LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startCheckpoint, + partitionGroupStartCheckpoint, partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); - startOffset = partitionStartOffset; + startCheckpoint = partitionGroupStartCheckpoint; } CommittingSegmentDescriptor committingSegmentDescriptor = - new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0); + new CommittingSegmentDescriptor(latestSegmentName, startCheckpoint.toString(), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); String newSegmentName = newLLCSegmentName.getSegmentName(); updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, instancePartitionsMap); } else { - if (!newPartitionGroupSet.contains(partitionGroupId)) { + if (newPartitionGroupSet.contains(partitionGroupId)) { // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In // that case, we need to either extend this part to handle the state, or prevent segments from getting into @@ -1115,6 +1102,27 @@ public class PinotLLCRealtimeSegmentManager { return idealState; } + private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) { + Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap()); + streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties + .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), + OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString()); + StreamConfig smallestOffsetCriteriaStreamConfig = + new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria); + List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo = + getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList()); + StreamPartitionMsgOffset partitionStartOffset = null; + for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) { + if (info.getPartitionGroupId() == partitionGroupId) { + StreamPartitionMsgOffsetFactory factory = + StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); + partitionStartOffset = factory.create(info.getStartCheckpoint()); + break; + } + } + return partitionStartOffset; + } + private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) { return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(), lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 0f33556..c19a845 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -908,32 +908,23 @@ public class PinotLLCRealtimeSegmentManagerTest { @Override void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, - List<String> newSegmentNames, SegmentAssignment segmentAssignment, + String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, - null, segmentAssignment, instancePartitionsMap); - for (String segmentName : newSegmentNames) { - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, - segmentName, segmentAssignment, instancePartitionsMap); - } + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null, + segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName, + segmentAssignment, instancePartitionsMap); } @Override List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) { return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, - getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString())) + PARTITION_OFFSET.toString())) .collect(Collectors.toList()); } @Override - LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) { - // The criteria for this test should always be SMALLEST (for default streaming config and new added partitions) - assertTrue(offsetCriteria.isSmallest()); - return PARTITION_OFFSET; - } - - @Override boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) { return _exceededMaxSegmentCompletionTime; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 1569d8e..4c1d8f4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -240,7 +241,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // Segment end criteria private volatile long _consumeEndTime = 0; private Checkpoint _finalOffset; // Used when we want to catch up to this one - private boolean _endOfPartitionGroup = false; private volatile boolean _shouldStop = false; // It takes 30s to locate controller leader, and more if there are multiple controller failures. @@ -306,12 +306,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT; return true; - } else if (_endOfPartitionGroup) { - // FIXME: handle numDocsIndexed == 0 case - segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}", - _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount); - _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP; - return true; } return false; @@ -390,8 +384,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { try { messageBatch = _partitionGroupConsumer .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis()); - _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup(); consecutiveErrorCount = 0; + } catch (TimeoutException e) { + handleTransientStreamErrors(e); + continue; } catch (TransientConsumerException e) { handleTransientStreamErrors(e); continue; @@ -1247,7 +1243,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { // long as the partition function is not changed. int numPartitions = columnPartitionConfig.getNumPartitions(); try { - int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L); + // TODO: currentPartitionGroupMetadata should be fetched from idealState + segmentZkMetadata, so that we get back accurate partitionGroups info + // However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has a single partition + // Fix this before opening support for partitioning in Kinesis + int numStreamPartitions = _streamMetadataProvider + .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, + Collections.emptyList(), /*maxWaitTimeMs=*/5000).size(); if (numStreamPartitions != numPartitions) { segmentLogger.warn( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions", @@ -1329,7 +1330,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { closeStreamMetadataProvider(); } segmentLogger.info("Creating new stream metadata provider, reason: {}", reason); - _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); + _streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(_clientId); } // This should be done during commit? We may not always commit when we build a segment.... diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 517f8c0..e1f8b05 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -22,12 +22,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.util.Map; +import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.JsonUtils; +/** + * A {@link Checkpoint} implementation for the Kinesis partition group consumption + * A partition group consists of 1 or more shards. The KinesisCheckpoint maintains a Map of shards to the sequenceNumber + */ public class KinesisCheckpoint implements StreamPartitionMsgOffset { - private Map<String, String> _shardToStartSequenceMap; + private final Map<String, String> _shardToStartSequenceMap; public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) { _shardToStartSequenceMap = shardToStartSequenceMap; @@ -68,6 +73,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset { @Override public int compareTo(Object o) { - return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next()); + return this._shardToStartSequenceMap.values().iterator().next() + .compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next()); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index 529f34f..fbe369f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -23,6 +23,9 @@ import org.apache.pinot.spi.stream.StreamConfig; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +/** + * Kinesis stream specific config + */ public class KinesisConfig { public static final String STREAM = "stream"; public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type"; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 4d968f6..61d065e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -27,14 +27,13 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; +/** + * Manages the Kinesis stream connection, given the stream name and aws region + */ public class KinesisConnectionHandler { KinesisClient _kinesisClient; - private String _stream; - private String _awsRegion; - - public KinesisConnectionHandler() { - - } + private final String _stream; + private final String _awsRegion; public KinesisConnectionHandler(String stream, String awsRegion) { _stream = stream; @@ -42,12 +41,18 @@ public class KinesisConnectionHandler { createConnection(); } + /** + * Lists all shards of the stream + */ public List<Shard> getShards() { ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build()); return listShardsResponse.shards(); } + /** + * Creates a Kinesis client for the stream + */ public void createConnection() { if (_kinesisClient == null) { _kinesisClient = diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 5cbd7e6..9c56f95 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -43,6 +43,9 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +/** + * A {@link PartitionGroupConsumer} implementation for the Kinesis stream + */ public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer { private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); String _stream; @@ -58,16 +61,19 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti _executorService = Executors.newSingleThreadExecutor(); } + /** + * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint + */ @Override - public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeoutMs) { + public KinesisRecordsBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs) { List<Record> recordList = new ArrayList<>(); Future<KinesisRecordsBatch> kinesisFetchResultFuture = - _executorService.submit(() -> getResult(start, end, recordList)); + _executorService.submit(() -> getResult(startCheckpoint, endCheckpoint, recordList)); try { return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (Exception e) { - return handleException((KinesisCheckpoint) start, recordList); + return handleException((KinesisCheckpoint) startCheckpoint, recordList); } } @@ -81,6 +87,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti } //TODO: iterate upon all the shardIds in the map + // Okay for now, since we have assumed that every partition group contains a single shard Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next(); String shardIterator = getShardIterator(next.getKey(), next.getValue()); @@ -156,14 +163,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap()); newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber); - - return new KinesisRecordsBatch(recordList, shardId, false); - } else { - return new KinesisRecordsBatch(recordList, shardId, false); } + return new KinesisRecordsBatch(recordList, shardId, false); } - public String getShardIterator(String shardId, String sequenceNumber) { + private String getShardIterator(String shardId, String sequenceNumber) { GetShardIteratorRequest.Builder requestBuilder = GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index fc9c4af..6792fb9 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -28,6 +28,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +/** + * {@link StreamConsumerFactory} implementation for the Kinesis stream + */ public class KinesisConsumerFactory extends StreamConsumerFactory { @Override @@ -43,7 +46,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory { @Override public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { - return null; + throw new UnsupportedOperationException(); } @Override @@ -52,7 +55,8 @@ public class KinesisConsumerFactory extends StreamConsumerFactory { } @Override - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, + PartitionGroupMetadata partitionGroupMetadata) { return new KinesisConsumer(new KinesisConfig(_streamConfig)); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java index f234bae..8f6b932 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java @@ -1,11 +1,15 @@ package org.apache.pinot.plugin.stream.kinesis; import java.io.IOException; +import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; +/** + * An implementation of the {@link PartitionGroupCheckpointFactory} for Kinesis stream + */ public class KinesisMsgOffsetFactory implements StreamPartitionMsgOffsetFactory { KinesisConfig _kinesisConfig; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index b3eb626..83228ec 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -22,13 +22,14 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; import org.apache.pinot.spi.stream.MessageBatch; -import org.apache.pinot.spi.stream.RowMetadata; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import software.amazon.awssdk.services.kinesis.model.Record; +/** + * A {@link MessageBatch} for collecting records from the Kinesis stream + */ public class KinesisRecordsBatch implements MessageBatch<byte[]> { private final List<Record> _recordList; private final String _shardId; @@ -49,6 +50,7 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> { public byte[] getMessageAtIndex(int index) { return _recordList.get(index).data().asByteArray(); } + @Override public int getMessageOffsetAtIndex(int index) { return ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index 8968b56..1083969 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -1,21 +1,14 @@ package org.apache.pinot.plugin.stream.kinesis; -import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumer; @@ -28,6 +21,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider; import software.amazon.awssdk.services.kinesis.model.Shard; +/** + * A {@link StreamMetadataProvider} implementation for the Kinesis stream + */ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { private final KinesisConnectionHandler _kinesisConnectionHandler; private final StreamConsumerFactory _kinesisStreamConsumerFactory; @@ -52,17 +48,23 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { throw new UnsupportedOperationException(); } + /** + * This call returns all active shards, taking into account the consumption status for those shards. + * PartitionGroupInfo is returned for a shard if: + * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list + * 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard + */ @Override public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis) throws IOException, TimeoutException { - Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = - currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); + Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream() + .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p)); List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>(); List<Shard> shards = _kinesisConnectionHandler.getShards(); - for (Shard shard : shards) { // go over all shards + for (Shard shard : shards) { KinesisCheckpoint newStartCheckpoint; String shardId = shard.shardId(); @@ -76,7 +78,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { } catch (Exception e) { // ignore. No end checkpoint yet for IN_PROGRESS segment } - if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment + if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); if (endingSequenceNumber != null) { // shard has ended // check if segment has consumed all the messages already @@ -104,8 +106,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap); } - newPartitionGroupInfos - .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize())); + newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize())); } return newPartitionGroupInfos; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java index bae8832..b7a9dba 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java @@ -18,7 +18,12 @@ */ package org.apache.pinot.spi.stream; +/** + * Keeps track of the consumption for a PartitionGroup + */ public interface Checkpoint extends Comparable { + String serialize(); + Checkpoint deserialize(String checkpointStr); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java index 14d2f39..4bd7839 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java @@ -18,32 +18,22 @@ */ package org.apache.pinot.spi.stream; -import org.apache.pinot.spi.annotations.InterfaceStability; - - /** * An interface to be implemented by streams that are consumed using Pinot LLC consumption. */ -@InterfaceStability.Evolving public interface PartitionGroupCheckpointFactory { /** * Initialization, called once when the factory is created. - * @param streamConfig */ void init(StreamConfig streamConfig); /** - * Construct an offset from the string provided. - * @param offsetStr - * @return StreamPartitionMsgOffset + * Construct a checkpoint from the string provided. */ Checkpoint create(String offsetStr); /** * Construct an offset from another one provided, of the same type. - * - * @param other - * @return */ Checkpoint create(Checkpoint other); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java index b421268..72b59d7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java @@ -22,7 +22,21 @@ import java.io.Closeable; import java.util.concurrent.TimeoutException; +/** + * Consumer interface for consuming from a partition group of a stream + */ public interface PartitionGroupConsumer extends Closeable { - MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) + + /** + * Fetch messages and offsets from the stream partition group + * + * @param startCheckpoint The offset of the first message desired, inclusive + * @param endCheckpoint The offset of the last message desired, exclusive, or null + * @param timeoutMs Timeout in milliseconds + * @throws java.util.concurrent.TimeoutException If the operation could not be completed within {@code timeoutMillis} + * milliseconds + * @return An iterable containing messages fetched from the stream partition and their offsets + */ + MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs) throws TimeoutException; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java index 438e148..758953d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java @@ -18,21 +18,22 @@ */ package org.apache.pinot.spi.stream; +/** + * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. + * This class is a container for the metadata of a partition group. It consists of + * 1. A unique partition group id for this partition group + * 2. The start checkpoint to begin consumption for this partition group + */ public class PartitionGroupInfo { - // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider) private final int _partitionGroupId; - private String _startCheckpoint; + private final String _startCheckpoint; public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) { _partitionGroupId = partitionGroupId; _startCheckpoint = startCheckpoint; } - public void setStartCheckpoint(String startCheckpoint) { - _startCheckpoint = startCheckpoint; - } - public int getPartitionGroupId() { return _partitionGroupId; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java index f2d3f17..9c746e8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; /** - * Fetches the partition count of a stream using the {@link StreamMetadataProvider} + * Creates a list of PartitionGroupInfo for all partition groups of the stream using the {@link StreamMetadataProvider} */ public class PartitionGroupInfoFetcher implements Callable<Boolean> { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index aaf20b6..a99a82b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -18,12 +18,8 @@ */ package org.apache.pinot.spi.stream; -import java.util.List; - - public class PartitionGroupMetadata { - // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider) private final int _partitionGroupId; private int _sequenceNumber; private String _startCheckpoint; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java index 3bedc8a..3f5b230 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java @@ -63,9 +63,9 @@ public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsume return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis); } - default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis) + default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs) throws java.util.concurrent.TimeoutException { - // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API - return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis); + return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, + timeoutMs); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java deleted file mode 100644 index b92f04d..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream; - -import java.util.concurrent.Callable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Fetches the partition offset for a stream given the offset criteria, using the {@link StreamMetadataProvider} - */ -public class PartitionOffsetFetcher implements Callable<Boolean> { - - private static final Logger LOGGER = LoggerFactory.getLogger(PartitionOffsetFetcher.class); - private static final int STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS = 10000; - - private final String _topicName; - private final OffsetCriteria _offsetCriteria; - private final int _partitionGroupId; - - private Exception _exception = null; - private StreamPartitionMsgOffset _offset; - private StreamConsumerFactory _streamConsumerFactory; - StreamConfig _streamConfig; - - public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) { - _offsetCriteria = offsetCriteria; - _partitionGroupId = partitionGroupId; - _streamConfig = streamConfig; - _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); - _topicName = streamConfig.getTopicName(); - } - - public StreamPartitionMsgOffset getOffset() { - return _offset; - } - - public Exception getException() { - return _exception; - } - - /** - * Callable to fetch the offset of the partition given the stream metadata and offset criteria - * @return - * @throws Exception - */ - @Override - public Boolean call() - throws Exception { - String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId; - try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory - .createPartitionMetadataProvider(clientId, _partitionGroupId)) { - _offset = - streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS); - if (_exception != null) { - LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName, - _partitionGroupId); - } - return Boolean.TRUE; - } catch (TransientConsumerException e) { - LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName, - _partitionGroupId, - e.getMessage()); - _exception = e; - return Boolean.FALSE; - } catch (Exception e) { - _exception = e; - throw e; - } - } -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java index f993fed..ac928c5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java @@ -41,7 +41,6 @@ public abstract class StreamConsumerFactory { * @param partition the partition id of the partition for which this consumer is being created * @return */ - @Deprecated public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition); /** @@ -74,8 +73,11 @@ public abstract class StreamConsumerFactory { return new LongMsgOffsetFactory(); } - // creates a consumer which consumes from a partition group - public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { - return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId()); + /** + * Creates a partition group consumer, which can fetch messages from a partition group + */ + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, + PartitionGroupMetadata partitionGroupMetadata) { + return createPartitionLevelConsumer(clientId, partitionGroupMetadata.getPartitionGroupId()); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index be2e819..cecc708 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -60,7 +60,7 @@ public interface StreamMetadataProvider extends Closeable { } /** - * Fetch the partitionGroupMetadata list. + * Fetch the list of partition group info for the latest state of the stream * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition. */ default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, @@ -69,14 +69,13 @@ public interface StreamMetadataProvider extends Closeable { int partitionCount = fetchPartitionCount(timeoutMillis); List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount); - // add a PartitionGroupInfo into the list foreach partition already present in current. - // the end checkpoint is set as checkpoint + // Add a PartitionGroupInfo into the list foreach partition already present in current. for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) { newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), currentPartitionGroupMetadata.getEndCheckpoint())); } - // add PartitiongroupInfo for new partitions - // use offset criteria from stream config + // Add PartitionGroupInfo for new partitions + // Use offset criteria from stream config StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) { StreamMetadataProvider partitionMetadataProvider = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org