KKcorps commented on code in PR #15957: URL: https://github.com/apache/pinot/pull/15957#discussion_r2131634314
########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java: ########## @@ -18,49 +18,51 @@ */ package org.apache.pinot.spi.stream; -import org.apache.pinot.spi.utils.IngestionConfigUtils; - - /** * A PartitionGroup is a group of partitions/shards that the same consumer should consume from. * This class contains all information which describes the latest state of a partition group. * It is constructed by looking at the segment zk metadata of the latest segment of each partition group. * It consists of: * 1. partitionGroupId - A unique ID for the partitionGroup - * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at - * 3. startOffset - The start offset that the latest segment started consuming from - * 4. endOffset - The endOffset (if segment consuming from this partition group has finished consuming the segment + * 2. streamPartitionId - Partition ID of the stream that this partitionGroup belongs to. + * 3. sequenceNumber - The sequenceNumber this partitionGroup is currently at + * 4. startOffset - The start offset that the latest segment started consuming from + * 5. endOffset - The endOffset (if segment consuming from this partition group has finished consuming the segment * and recorded the end * offset) - * 5. status - the consumption status IN_PROGRESS/DONE + * 6. status - the consumption status IN_PROGRESS/DONE * * This information is needed by the stream, when grouping the partitions/shards into new partition groups. */ public class PartitionGroupConsumptionStatus { - private final int _partitionGroupId; - private final int _streamPartitionGroupId; + private final int _streamPartitionId; private int _sequenceNumber; private StreamPartitionMsgOffset _startOffset; private StreamPartitionMsgOffset _endOffset; private String _status; - public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, - StreamPartitionMsgOffset endOffset, String status) { + public PartitionGroupConsumptionStatus(int partitionGroupId, int streamPartitionId, int sequenceNumber, + StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset endOffset, String status) { _partitionGroupId = partitionGroupId; - _streamPartitionGroupId = IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId); + _streamPartitionId = streamPartitionId; _sequenceNumber = sequenceNumber; _startOffset = startOffset; _endOffset = endOffset; _status = status; } + public PartitionGroupConsumptionStatus(int partitionGroupId, int sequenceNumber, StreamPartitionMsgOffset startOffset, + StreamPartitionMsgOffset endOffset, String status) { + this(partitionGroupId, partitionGroupId, sequenceNumber, startOffset, endOffset, status); + } + public int getPartitionGroupId() { return _partitionGroupId; } public int getStreamPartitionGroupId() { Review Comment: nit: we should rename the method to `getStreamPartitionId` as well we are changing the variable name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org