Copilot commented on code in PR #16494:
URL: https://github.com/apache/pinot/pull/16494#discussion_r2292310679
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -431,6 +435,10 @@ public long getOffsetAutoResetTimeSecThreshold() {
return _offsetAutoResetTimeSecThreshold;
}
+ public Boolean isEphemeralBackfillTopic() {
+ return Boolean.TRUE.equals(_ephemeralBackfillTopic);
Review Comment:
The return statement uses Boolean.TRUE.equals() which is unnecessarily
complex. Since _ephemeralBackfillTopic is initialized to false when null, a
simple null check and return would be clearer: return _ephemeralBackfillTopic
!= null && _ephemeralBackfillTopic;
```suggestion
return _ephemeralBackfillTopic != null && _ephemeralBackfillTopic;
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2352,16 +2417,17 @@ private Set<String> filterSegmentsToCommit(Set<String>
allConsumingSegments,
return segmentsToCommit;
}
- // partitionGroupIdsToCommitStr != null
- Set<Integer> partitionsToCommit =
Arrays.stream(partitionGroupIdsToCommitStr.split(","))
+ // partitionGroupInfosToCommitStr != null
+ Set<Integer> partitionsToCommit =
Arrays.stream(partitionGroupInfosToCommitStr.split(","))
.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toSet());
Set<String> targetSegments = allConsumingSegments.stream()
- .filter(segmentName -> partitionsToCommit.contains(new
LLCSegmentName(segmentName).getPartitionGroupId()))
+ .filter(
+ segmentName -> partitionsToCommit.contains(new
LLCSegmentName(segmentName).getPartitionGroupTopicAndId()))
Review Comment:
The filter is calling getPartitionGroupTopicAndId() but comparing against
partitionsToCommit which contains Integer values parsed from comma-separated
string. This will cause ClassCastException since String cannot be compared to
Integer.
```suggestion
segmentName -> partitionsToCommit.contains(new
LLCSegmentName(segmentName).getPartitionGroupId()))
```
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -38,25 +38,63 @@ public class LLCSegmentName implements
Comparable<LLCSegmentName> {
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
+ @Nullable
+ private final String _topicName;
public LLCSegmentName(String segmentName) {
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
- Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
+ // Validate the segment name format should have 4 or 5 parts:
+ // e.g. tableName__partitionGroupId__sequenceNumber__creationTime
+ // or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
+ Preconditions.checkArgument(
+ parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
- _partitionGroupId = Integer.parseInt(parts[1]);
- _sequenceNumber = Integer.parseInt(parts[2]);
- _creationTime = parts[3];
+ if (parts.length == 4) {
+ _topicName = null;
+ _partitionGroupId = Integer.parseInt(parts[1]);
+ _sequenceNumber = Integer.parseInt(parts[2]);
+ _creationTime = parts[3];
+ } else {
+ _topicName = parts[1];
+ _partitionGroupId = Integer.parseInt(parts[2]);
+ _sequenceNumber = Integer.parseInt(parts[3]);
+ _creationTime = parts[4];
+ }
_segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
+ this(tableName, null, partitionGroupId, sequenceNumber, msSinceEpoch);
+ }
+
+ public LLCSegmentName(
+ String tableName, @Nullable String topicName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table
name: %s", tableName);
+ Preconditions.checkArgument(topicName == null ||
!topicName.contains(SEPARATOR),
+ "Illegal topic name: %s", tableName);
_tableName = tableName;
+ _topicName = topicName;
_partitionGroupId = partitionGroupId;
_sequenceNumber = sequenceNumber;
// ISO8601 date: 20160120T1234Z
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
- _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
+ if (topicName == null) {
+ _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
+ } else {
+ _segmentName =
+ tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId +
SEPARATOR + sequenceNumber + SEPARATOR
+ + _creationTime;
+ }
+ }
+
+ private LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, String creationTime,
+ String segmentName) {
+ _tableName = tableName;
+ _topicName = "";
Review Comment:
The private constructor sets _topicName to an empty string instead of null,
which is inconsistent with the null check pattern used elsewhere in the class.
This could cause issues with the getPartitionGroupTopicAndId() method logic.
```suggestion
_topicName = null;
```
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
+ topicName;
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
int index = i;
+ int finalPermanentTopicIndex = permanentTopicIndex;
+ // For permanent topics, we use the index of the stream config to get
the partition group consumption status.
+ // For ephemeral backfill topics, we use the topic name to filter the
partition group consumption status.
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
+ .filter(partitionGroupConsumptionStatus ->
_streamConfigs.get(index).isEphemeralBackfillTopic()
+ ?
_streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
+ :
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ partitionGroupConsumptionStatus.getPartitionGroupId())
== finalPermanentTopicIndex)
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
+ // Similarly, for ephemeral backfill topics, we create the partition
group metadata with the topic name.
_newPartitionGroupMetadataList.addAll(
streamMetadataProvider.computePartitionGroupMetadata(clientId,
- streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
+ _streamConfigs.get(i),
topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream)
.stream()
.map(metadata -> new PartitionGroupMetadata(
-
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
- index), metadata.getStartOffset()))
+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic() ?
_streamConfigs.get(index)
+ .getTopicName() : "",
+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic()
Review Comment:
The condition logic is incorrect. It's checking if the stream config at
finalPermanentTopicIndex is ephemeral, but it should be checking if the stream
config at index is ephemeral. This could lead to incorrect partition group
metadata creation.
```suggestion
_streamConfigs.get(index).isEphemeralBackfillTopic() ?
_streamConfigs.get(index)
.getTopicName() : "",
_streamConfigs.get(index).isEphemeralBackfillTopic()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]