lnbest0707-uber commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2264065562
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -796,9 +796,10 @@ private String createNewSegmentMetadata(TableConfig
tableConfig, IdealState idea
// segment. For tables with pauseless mode enabled, this size is
unavailable at this step because the
// segment has not yet been built.
- createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0),
newLLCSegment, newSegmentCreationTimeMs,
+ createNewSegmentZKMetadataWithOffsetAutoReset(tableConfig,
streamConfigs.get(0), newLLCSegment,
+ newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, partitionIds.size(),
- numReplicas);
+ numReplicas, false);
Review Comment:
Revised as suggested
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,78 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
Review Comment:
If it is enabled, the log
```
LOGGER.info("Latest offset of topic {} and partition {} is {}",
streamConfig.getTopicName(), partitionId, latestOffset);
```
will be printed.
And right after this method,
```
LOGGER.info(
"Creating segment ZK metadata for new CONSUMING segment: {} with
start offset: {} and creation time: {}", segmentName, startOffset,
creationTimeMs);
```
would be printed.
With the help of those 2 logs, we should be able to understand if autoReset
is enabled or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]