This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2f76e37196 Reuse Kafka admin client for better performance (#16129) 2f76e37196 is described below commit 2f76e371968152fd428e08b2e66b196328920786 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Jun 20 10:25:39 2025 +0530 Reuse Kafka admin client for better performance (#16129) Co-authored-by: KKCorps <kar...@startee.ai> --- .../plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java | 3 +++ .../pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java | 6 +++--- .../plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java | 2 ++ .../pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java | 6 +++--- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java index ea0a5093e8..fb7052f6ac 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java @@ -56,6 +56,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { protected final Consumer<String, Bytes> _consumer; protected final TopicPartition _topicPartition; protected final Properties _consumerProp; + protected final AdminClient _adminClient; public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) { _config = new KafkaPartitionLevelStreamConfig(streamConfig); @@ -67,6 +68,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { _consumer = createConsumer(_consumerProp); _topicPartition = new TopicPartition(_topic, _partition); _consumer.assign(Collections.singletonList(_topicPartition)); + _adminClient = createAdminClient(); } private Properties buildProperties(StreamConfig streamConfig) { @@ -116,6 +118,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { public void close() throws IOException { _consumer.close(); + _adminClient.close(); } @VisibleForTesting diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index a584a0ee59..0d963ea65b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - try (AdminClient adminClient = createAdminClient()) { + try { // Build the offset spec request for this partition Map<TopicPartition, OffsetSpec> request = new HashMap<>(); if (offsetCriteria.isLargest()) { @@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); } // Query via AdminClient (thread-safe) - ListOffsetsResult result = adminClient.listOffsets(request); + ListOffsetsResult result = _adminClient.listOffsets(request); Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { // fetch endOffsets as fallback request.put(_topicPartition, OffsetSpec.latest()); - result = adminClient.listOffsets(request); + result = _adminClient.listOffsets(request); offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); LOGGER.warn( "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java index 92ee657a5a..81690b5380 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java @@ -55,6 +55,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { protected final Consumer<String, Bytes> _consumer; protected final TopicPartition _topicPartition; protected final Properties _consumerProp; + protected final AdminClient _adminClient; public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) { _config = new KafkaPartitionLevelStreamConfig(streamConfig); @@ -66,6 +67,7 @@ public abstract class KafkaPartitionLevelConnectionHandler { _consumer = createConsumer(_consumerProp); _topicPartition = new TopicPartition(_topic, _partition); _consumer.assign(Collections.singletonList(_topicPartition)); + _adminClient = createAdminClient(); } private Properties buildProperties(StreamConfig streamConfig) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 65c803804b..c3e9aadbbc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -100,7 +100,7 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa @Override public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) { Preconditions.checkNotNull(offsetCriteria); - try (AdminClient adminClient = createAdminClient()) { + try { // Build the offset spec request for this partition Map<TopicPartition, OffsetSpec> request = new HashMap<>(); if (offsetCriteria.isLargest()) { @@ -117,13 +117,13 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa throw new IllegalArgumentException("Unknown offset criteria: " + offsetCriteria); } // Query via AdminClient (thread-safe) - ListOffsetsResult result = adminClient.listOffsets(request); + ListOffsetsResult result = _adminClient.listOffsets(request); Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); if (!isValidOffsetInfo(offsets) && (offsetCriteria.isTimestamp() || offsetCriteria.isPeriod())) { // fetch endOffsets as fallback request.put(_topicPartition, OffsetSpec.latest()); - result = adminClient.listOffsets(request); + result = _adminClient.listOffsets(request); offsets = result.all().get(timeoutMillis, TimeUnit.MILLISECONDS); LOGGER.warn( "initial offset type is {} and its value evaluates to null hence proceeding with offset {} " + "for " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org