9aman commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2262077528
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,78 @@ private void createNewSegmentZKMetadata(TableConfig
tableConfig, StreamConfig st
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
+ /**
+ * Creates and persists segment ZK metadata for the new CONSUMING segment.
+ */
+ private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig
tableConfig, StreamConfig streamConfig,
+ LLCSegmentName newLLCSegmentName, long creationTimeMs,
CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata,
InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas, boolean skipAutoReset) {
+ String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+ String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+ oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata,
instancePartitions, numPartitions, numReplicas,
+ startOffset);
+ }
+
+ private String computeStartOffset(String nextOffset, StreamConfig
streamConfig, int partitionId) {
+ if (!streamConfig.isEnableOffsetAutoReset()) {
+ return nextOffset;
+ }
+ long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+ int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+ if (timeThreshold <= 0 && offsetThreshold <= 0) {
+ LOGGER.warn("Invalid offset auto reset configuration for table: {},
topic: {}. "
+ + "timeThreshold: {}, offsetThreshold: {}",
+ streamConfig.getTableNameWithType(), streamConfig.getTopicName(),
timeThreshold, offsetThreshold);
+ return nextOffset;
+ }
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
Review Comment:
Can we move this to a separate function `getLargestOffset`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]