Jackie-Jiang commented on code in PR #16494:
URL: https://github.com/apache/pinot/pull/16494#discussion_r2292310357
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
+ String tableName, @Nullable String topicName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table
name: %s", tableName);
+ Preconditions.checkArgument(topicName == null ||
!topicName.contains(SEPARATOR),
Review Comment:
Consider also checking if it is not empty string
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
+ String tableName, @Nullable String topicName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table
name: %s", tableName);
+ Preconditions.checkArgument(topicName == null ||
!topicName.contains(SEPARATOR),
+ "Illegal topic name: %s", tableName);
Review Comment:
```suggestion
"Illegal topic name: %s", topicName);
```
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
+ String tableName, @Nullable String topicName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table
name: %s", tableName);
+ Preconditions.checkArgument(topicName == null ||
!topicName.contains(SEPARATOR),
+ "Illegal topic name: %s", tableName);
_tableName = tableName;
+ _topicName = topicName;
_partitionGroupId = partitionGroupId;
_sequenceNumber = sequenceNumber;
// ISO8601 date: 20160120T1234Z
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
- _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
+ if (topicName == null) {
+ _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
+ } else {
+ _segmentName =
+ tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId +
SEPARATOR + sequenceNumber + SEPARATOR
+ + _creationTime;
+ }
+ }
+
+ private LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, String creationTime,
Review Comment:
Remove this constructor
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
Review Comment:
(minor, format) The changes doesn't follow [Pinot
Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide).
Can you set it up and reformat the changes
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -76,13 +118,7 @@ public static LLCSegmentName of(String segmentName) {
* Returns whether the given segment name represents an LLC segment.
*/
public static boolean isLLCSegment(String segmentName) {
- int numSeparators = 0;
- int index = 0;
- while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
- numSeparators++;
- index += 2; // SEPARATOR.length()
- }
- return numSeparators == 3;
+ return of(segmentName) != null;
Review Comment:
Let's only change the last check as `numSeparators == 3 || numSeparators ==
4` as it is more efficient
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2776,54 +2842,4 @@ public List<String> getCommittingSegments(String
realtimeTableName) {
private List<String> getCommittingSegments(String realtimeTableName,
Collection<String> segmentsToCheck) {
return getCommittingSegments(realtimeTableName, segmentsToCheck,
_helixResourceManager::getSegmentZKMetadata);
}
-
- public static List<String> getCommittingSegments(String realtimeTableName,
Review Comment:
Several methods/comments are moved. What's the reason? Seems these methods
fit more in their original position
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -127,6 +181,11 @@ public String getSegmentName() {
public int compareTo(LLCSegmentName other) {
Preconditions.checkArgument(_tableName.equals(other._tableName),
"Cannot compare segment names from different table: %s, %s",
_segmentName, other.getSegmentName());
+ String thisTopicName = _topicName == null ? "" : _topicName;
+ String otherTopicName = other._topicName == null ? "" : other._topicName;
+ if (!thisTopicName.equals(otherTopicName)) {
+ return StringUtils.compare(_topicName, other._topicName);
+ }
Review Comment:
Do you want to compare topic name here? What is the side effect of comparing
it?
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -65,6 +103,10 @@ public LLCSegmentName(String tableName, int
partitionGroupId, int sequenceNumber
*/
@Nullable
public static LLCSegmentName of(String segmentName) {
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+ if (parts.length < 4 || parts.length > 5) {
+ return null;
+ }
Review Comment:
Let's not modify this. This is adding overhead
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
Review Comment:
Can we move this check to
`PinotLLCRealtimeSegmentManager.setupNewPartitionGroup()`? Currently it is
deeply nested, which makes it very hard to follow (I missed it several times,
and in the end find this rewrite)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1833,29 +1896,31 @@ private StreamPartitionMsgOffset
selectStartOffset(OffsetCriteria offsetCriteria
}
private LLCSegmentName getNextLLCSegmentName(LLCSegmentName
lastLLCSegmentName, long creationTimeMs) {
- return new LLCSegmentName(lastLLCSegmentName.getTableName(),
lastLLCSegmentName.getPartitionGroupId(),
- lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
+ return new LLCSegmentName(lastLLCSegmentName.getTableName(),
lastLLCSegmentName.getTopicName(),
+ lastLLCSegmentName.getPartitionGroupId(),
lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
}
/**
* Sets up a new partition group.
* <p>Persists the ZK metadata for the first CONSUMING segment, and returns
the segment name.
*/
- private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig
streamConfig,
+ private String setupNewPartitionGroup(TableConfig tableConfig,
List<StreamConfig> streamConfigs,
Review Comment:
I don't follow this change. What does it mean to setup a new partition group
on multiple `StreamConfig`? Which `StreamConfig` should we reference when
setting it up?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
+ ?
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
+ :
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ partitionGroupConsumptionStatus.getPartitionGroupId())
== finalPermanentTopicIndex)
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
+ // Similarly, for ephemeral backfill topics, we create the partition
group metadata with the topic name.
_newPartitionGroupMetadataList.addAll(
streamMetadataProvider.computePartitionGroupMetadata(clientId,
- streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
+ _streamConfigs.get(i),
topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream)
.stream()
.map(metadata -> new PartitionGroupMetadata(
-
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
- index), metadata.getStartOffset()))
+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic() ?
_streamConfigs.get(index)
+ .getTopicName() : "",
Review Comment:
Is this a bug? Will it break if we use `""` as the topic name? Is there a
test for this?
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
+ ?
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
Review Comment:
Do we need this check?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]