lnbest0707-uber commented on code in PR #16492:
URL: https://github.com/apache/pinot/pull/16492#discussion_r2268636788


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig 
tableConfig, StreamConfig st
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
 
+  private String computeStartOffset(String nextOffset, StreamConfig 
streamConfig, int partitionId) {
+    if (!streamConfig.isEnableOffsetAutoReset()) {
+      return nextOffset;
+    }
+    long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+    int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+    if (timeThreshold <= 0 && offsetThreshold <= 0) {
+      LOGGER.warn("Invalid offset auto reset configuration for table: {}, 
topic: {}. "
+              + "timeThreshold: {}, offsetThreshold: {}",
+          streamConfig.getTableNameWithType(), streamConfig.getTopicName(), 
timeThreshold, offsetThreshold);
+      return nextOffset;
+    }
+    String clientId =
+        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+            + streamConfig.getTopicName();
+    StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    StreamPartitionMsgOffsetFactory offsetFactory = 
consumerFactory.createStreamMsgOffsetFactory();
+    StreamPartitionMsgOffset nextOffsetWithType = 
offsetFactory.create(nextOffset);
+    StreamPartitionMsgOffset offsetAtSLA;
+    StreamPartitionMsgOffset latestOffset;
+    try (StreamMetadataProvider metadataProvider = 
consumerFactory.createPartitionMetadataProvider(clientId,
+        partitionId)) {
+      // Fetching timestamp from an offset is an expensive operation which 
requires reading the data,
+      // while fetching offset from timestamp is lightweight and only needs to 
read metadata.
+      // Hence, instead of checking if latestOffset's time - nextOffset's time 
< SLA, we would check
+      // (CurrentTime - SLA)'s offset > nextOffset.
+      // TODO: it is relying on System.currentTimeMillis() which might be 
affected by time drift. If we are able to
+      // get nextOffset's time, we should instead check (nextOffset's time + 
SLA)'s offset < latestOffset
+      latestOffset = metadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+      LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+          latestOffset);
+      offsetAtSLA =

Review Comment:
   Right now, the check is during set time. It makes sense to do in advance to 
avoid unnecessary call.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig 
tableConfig, StreamConfig st
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
 
+  private String computeStartOffset(String nextOffset, StreamConfig 
streamConfig, int partitionId) {
+    if (!streamConfig.isEnableOffsetAutoReset()) {
+      return nextOffset;
+    }
+    long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+    int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+    if (timeThreshold <= 0 && offsetThreshold <= 0) {
+      LOGGER.warn("Invalid offset auto reset configuration for table: {}, 
topic: {}. "
+              + "timeThreshold: {}, offsetThreshold: {}",
+          streamConfig.getTableNameWithType(), streamConfig.getTopicName(), 
timeThreshold, offsetThreshold);
+      return nextOffset;
+    }
+    String clientId =

Review Comment:
   good point.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig 
tableConfig, StreamConfig st
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
 
+  private String computeStartOffset(String nextOffset, StreamConfig 
streamConfig, int partitionId) {
+    if (!streamConfig.isEnableOffsetAutoReset()) {
+      return nextOffset;
+    }
+    long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
+    int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
+    if (timeThreshold <= 0 && offsetThreshold <= 0) {
+      LOGGER.warn("Invalid offset auto reset configuration for table: {}, 
topic: {}. "
+              + "timeThreshold: {}, offsetThreshold: {}",
+          streamConfig.getTableNameWithType(), streamConfig.getTopicName(), 
timeThreshold, offsetThreshold);
+      return nextOffset;
+    }
+    String clientId =
+        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + 
streamConfig.getTableNameWithType() + "-"
+            + streamConfig.getTopicName();
+    StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+    StreamPartitionMsgOffsetFactory offsetFactory = 
consumerFactory.createStreamMsgOffsetFactory();
+    StreamPartitionMsgOffset nextOffsetWithType = 
offsetFactory.create(nextOffset);
+    StreamPartitionMsgOffset offsetAtSLA;
+    StreamPartitionMsgOffset latestOffset;
+    try (StreamMetadataProvider metadataProvider = 
consumerFactory.createPartitionMetadataProvider(clientId,
+        partitionId)) {
+      // Fetching timestamp from an offset is an expensive operation which 
requires reading the data,
+      // while fetching offset from timestamp is lightweight and only needs to 
read metadata.
+      // Hence, instead of checking if latestOffset's time - nextOffset's time 
< SLA, we would check
+      // (CurrentTime - SLA)'s offset > nextOffset.
+      // TODO: it is relying on System.currentTimeMillis() which might be 
affected by time drift. If we are able to
+      // get nextOffset's time, we should instead check (nextOffset's time + 
SLA)'s offset < latestOffset
+      latestOffset = metadataProvider.fetchStreamPartitionOffset(
+          OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
+      LOGGER.info("Latest offset of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+          latestOffset);
+      offsetAtSLA =
+          metadataProvider.getOffsetAtTimestamp(partitionId, 
System.currentTimeMillis() - timeThreshold * 1000,
+              STREAM_FETCH_TIMEOUT_MS);
+      LOGGER.info("Offset at SLA of topic {} and partition {} is {}", 
streamConfig.getTopicName(), partitionId,
+          offsetAtSLA);
+    } catch (Exception e) {
+      LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting 
offsets", e);
+      return nextOffset;
+    }
+    try {
+      if (timeThreshold > 0 && offsetAtSLA != null && 
offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
+        LOGGER.info("Auto reset offset from {} to {} on partition {} because 
time threshold reached", nextOffset,

Review Comment:
   If the reset happens, users must have already set this up through the config 
explicitly. Then the happening of the reset is an "expected" behavior which I 
feel may not be good to be logged as warnings.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java:
##########
@@ -143,6 +143,25 @@ private StreamConfigProperties() {
   public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS =
       "realtime.segment.pauseless.download.timeoutSeconds";
 
+  /**
+   * Config used to enable offset auto reset during segment commit.
+   */
+  public static final String ENABLE_OFFSET_AUTO_RESET = 
"realtime.segment.offsetAutoReset.enable";
+
+  /**
+   * During segment commit, the new segment startOffset would skip to the 
latest offset if thisValue is set as positive
+   * and (latestStreamOffset - latestIngestedOffset > thisValue)
+   */
+  public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY =
+      "realtime.segment.offsetAutoReset.offsetThreshold";
+
+  /**
+   * During segment commit, the new segment startOffset would skip to the 
latest offset if thisValue is set as positive
+   * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp > 
thisValue)
+   */
+  public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY =
+      "realtime.segment.offsetAutoReset.timeSecThreshold";

Review Comment:
   Second should be enough. The calculation is usually not able to be accurate 
enough for millis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to