lnbest0707-uber commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2264065562


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -796,9 +796,10 @@ private String createNewSegmentMetadata(TableConfig 
tableConfig, IdealState idea
         //  segment. For tables with pauseless mode enabled, this size is 
unavailable at this step because the
         //  segment has not yet been built.
 
-        createNewSegmentZKMetadata(tableConfig, streamConfigs.get(0), 
newLLCSegment, newSegmentCreationTimeMs,
+        createNewSegmentZKMetadataWithOffsetAutoReset(tableConfig, 
streamConfigs.get(0), newLLCSegment,
+            newSegmentCreationTimeMs,
             committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, partitionIds.size(),
-            numReplicas);
+            numReplicas, false);

Review Comment:
   Revised as suggested



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +962,78 @@ private void createNewSegmentZKMetadata(TableConfig 
tableConfig, StreamConfig st
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
 
+  /**
+   * Creates and persists segment ZK metadata for the new CONSUMING segment.
+   */
+  private void createNewSegmentZKMetadataWithOffsetAutoReset(TableConfig 
tableConfig, StreamConfig streamConfig,
+      LLCSegmentName newLLCSegmentName, long creationTimeMs, 
CommittingSegmentDescriptor committingSegmentDescriptor,
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, 
InstancePartitions instancePartitions, int numPartitions,
+      int numReplicas, boolean skipAutoReset) {
+    String oldStartOffset = committingSegmentDescriptor.getNextOffset();
+    String startOffset = skipAutoReset ? oldStartOffset : computeStartOffset(
+        oldStartOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
+    createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
creationTimeMs,
+        committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas,
+        startOffset);
+  }
+
+  private String computeStartOffset(String nextOffset, StreamConfig 
streamConfig, int partitionId) {
+    if (!streamConfig.isEnableOffsetAutoReset()) {
+      return nextOffset;

Review Comment:
   If it is enabled, the log
   ```
   LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId, latestOffset);
   ```
   will be printed.
   And right after this method,
   ```
   LOGGER.info(
           "Creating segment ZK metadata for new CONSUMING segment: {} with 
start offset: {} and creation time: {}", segmentName, startOffset, 
creationTimeMs);
   ```
   would be printed.
   With the help of those 2 logs, we should be able to understand if autoReset 
is enabled or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to