This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5701117 Classify timeout exception as TransientException so that the caller can retry on it. (#8043) 5701117 is described below commit 5701117fed61476e2da2e4428216e02f7d5bf217 Author: Ting Chen <tingc...@uber.com> AuthorDate: Fri Jan 21 14:13:30 2022 -0800 Classify timeout exception as TransientException so that the caller can retry on it. (#8043) --- .../kafka20/KafkaStreamMetadataProvider.java | 31 +++++++++++++++------- .../spi/stream/PartitionGroupMetadataFetcher.java | 2 +- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 519b3d0..5af4d7d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -22,11 +22,13 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.time.Duration; import java.util.Collections; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.stream.TransientConsumerException; public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler @@ -42,23 +44,32 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public int fetchPartitionCount(long timeoutMillis) { - return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); + try { + return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size(); + } catch (TimeoutException e) { + throw new TransientConsumerException(e); + } } @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); long offset; - if (offsetCriteria.isLargest()) { - offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - } else if (offsetCriteria.isSmallest()) { - offset = _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) - .get(_topicPartition); - } else { - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + try { + if (offsetCriteria.isLargest()) { + offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + } else if (offsetCriteria.isSmallest()) { + offset = + _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)) + .get(_topicPartition); + } else { + throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + } + return new LongMsgOffset(offset); + } catch (TimeoutException e) { + throw new TransientConsumerException(e); } - return new LongMsgOffset(offset); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 58cbb3f..6ffdbe0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -74,7 +74,7 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { } return Boolean.TRUE; } catch (TransientConsumerException e) { - LOGGER.warn("Could not get partition count for topic {}", _topicName, e); + LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e); _exception = e; return Boolean.FALSE; } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org