This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ddce8d6055 Only include kafka properties when creating kafka clients 
(#16965)
9ddce8d6055 is described below

commit 9ddce8d6055e43542ebe8cd416269cda0c456bd6
Author: harold-kfuse <[email protected]>
AuthorDate: Wed Oct 8 11:45:51 2025 -0700

    Only include kafka properties when creating kafka clients (#16965)
---
 .../KafkaPartitionLevelConnectionHandler.java      | 26 ++++++++++++++++++++--
 .../KafkaPartitionLevelConnectionHandler.java      | 26 ++++++++++++++++++++--
 2 files changed, 48 insertions(+), 4 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index e63f1d83388..e3362a0577a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -23,9 +23,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,6 +51,8 @@ import org.slf4j.LoggerFactory;
 public abstract class KafkaPartitionLevelConnectionHandler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+  private static final Set<String> CONSUMER_CONFIG_NAMES = 
ConsumerConfig.configNames();
+  private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES = 
AdminClientConfig.configNames();
   protected final KafkaPartitionLevelStreamConfig _config;
   protected final String _clientId;
   protected final int _partition;
@@ -82,13 +86,31 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     return consumerProp;
   }
 
+  /**
+   * Filter properties to only include the specified Kafka configurations.
+   * This prevents "was supplied but isn't a known config" warnings from Kafka 
clients.
+   *
+   * @param props The properties to filter
+   * @param validConfigNames The set of valid configuration names for the 
target Kafka client
+   * @return A new Properties object containing only the valid configurations
+   */
+  private Properties filterKafkaProperties(Properties props, Set<String> 
validConfigNames) {
+    Properties filteredProps = new Properties();
+    for (String key : props.stringPropertyNames()) {
+      if (validConfigNames.contains(key)) {
+        filteredProps.put(key, props.get(key));
+      }
+    }
+    return filteredProps;
+  }
+
   @VisibleForTesting
   protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
-    return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+    return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp, 
CONSUMER_CONFIG_NAMES)), 5);
   }
 
   protected AdminClient createAdminClient() {
-    return retry(() -> AdminClient.create(_consumerProp), 5);
+    return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp, 
ADMIN_CLIENT_CONFIG_NAMES)), 5);
   }
 
   private static <T> T retry(Supplier<T> s, int nRetries) {
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index e24a22c6c6f..ef39b256c7f 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -23,9 +23,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
 public abstract class KafkaPartitionLevelConnectionHandler {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+  private static final Set<String> CONSUMER_CONFIG_NAMES = 
ConsumerConfig.configNames();
+  private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES = 
AdminClientConfig.configNames();
   protected final KafkaPartitionLevelStreamConfig _config;
   protected final String _clientId;
   protected final int _partition;
@@ -81,13 +85,31 @@ public abstract class KafkaPartitionLevelConnectionHandler {
     return consumerProp;
   }
 
+  /**
+   * Filter properties to only include the specified Kafka configurations.
+   * This prevents "was supplied but isn't a known config" warnings from Kafka 
clients.
+   *
+   * @param props The properties to filter
+   * @param validConfigNames The set of valid configuration names for the 
target Kafka client
+   * @return A new Properties object containing only the valid configurations
+   */
+  private Properties filterKafkaProperties(Properties props, Set<String> 
validConfigNames) {
+    Properties filteredProps = new Properties();
+    for (String key : props.stringPropertyNames()) {
+      if (validConfigNames.contains(key)) {
+        filteredProps.put(key, props.get(key));
+      }
+    }
+    return filteredProps;
+  }
+
   @VisibleForTesting
   protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
-    return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+    return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp, 
CONSUMER_CONFIG_NAMES)), 5);
   }
 
   protected AdminClient createAdminClient() {
-    return retry(() -> AdminClient.create(_consumerProp), 5);
+    return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp, 
ADMIN_CLIENT_CONFIG_NAMES)), 5);
   }
 
   private static <T> T retry(Supplier<T> s, int nRetries) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to