KKcorps commented on code in PR #15641:
URL: https://github.com/apache/pinot/pull/15641#discussion_r2060692356


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -97,45 +100,40 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
-    long offset;
     try {
+      // Build the offset spec request for this partition
+      Map<TopicPartition, OffsetSpec> request = new HashMap<>();
       if (offsetCriteria.isLargest()) {
-        offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-            .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.latest());
       } else if (offsetCriteria.isSmallest()) {
-        offset =
-            
_consumer.beginningOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-                .get(_topicPartition);
+        request.put(_topicPartition, OffsetSpec.earliest());
       } else if (offsetCriteria.isPeriod()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-                Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString())))
-            .get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is period and its value evaluates to null 
hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = Clock.systemUTC().millis() - 
TimeUtils.convertPeriodToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else if (offsetCriteria.isTimestamp()) {
-        OffsetAndTimestamp offsetAndTimestamp = 
_consumer.offsetsForTimes(Collections.singletonMap(_topicPartition,
-            
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString()))).get(_topicPartition);
-        if (offsetAndTimestamp == null) {
-          offset = 
_consumer.endOffsets(Collections.singletonList(_topicPartition), 
Duration.ofMillis(timeoutMillis))
-              .get(_topicPartition);
-          LOGGER.warn(
-              "initial offset type is timestamp and its value evaluates to 
null hence proceeding with offset {} for "
-                  + "topic {} partition {}", offset, _topicPartition.topic(), 
_topicPartition.partition());
-        } else {
-          offset = offsetAndTimestamp.offset();
-        }
+        long ts = 
TimeUtils.convertTimestampToMillis(offsetCriteria.getOffsetString());
+        request.put(_topicPartition, OffsetSpec.forTimestamp(ts));
       } else {
-        throw new IllegalArgumentException("Unknown initial offset value " + 
offsetCriteria);
+        throw new IllegalArgumentException("Unknown offset criteria: " + 
offsetCriteria);
       }
-      return new LongMsgOffset(offset);
-    } catch (TimeoutException e) {
+      // Query via AdminClient (thread-safe)
+      ListOffsetsResult result = _adminClient.listOffsets(request);
+      Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
+          result.all().get(timeoutMillis, TimeUnit.MILLISECONDS);
+      if (offsets == null || offsets.isEmpty() || 
!offsets.containsKey(_topicPartition)
+          || offsets.get(_topicPartition).offset() < 0) {
+        // fetch endOffsets as fallback

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to