This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9ddce8d6055 Only include kafka properties when creating kafka clients
(#16965)
9ddce8d6055 is described below
commit 9ddce8d6055e43542ebe8cd416269cda0c456bd6
Author: harold-kfuse <[email protected]>
AuthorDate: Wed Oct 8 11:45:51 2025 -0700
Only include kafka properties when creating kafka clients (#16965)
---
.../KafkaPartitionLevelConnectionHandler.java | 26 ++++++++++++++++++++--
.../KafkaPartitionLevelConnectionHandler.java | 26 ++++++++++++++++++++--
2 files changed, 48 insertions(+), 4 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index e63f1d83388..e3362a0577a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -23,9 +23,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -49,6 +51,8 @@ import org.slf4j.LoggerFactory;
public abstract class KafkaPartitionLevelConnectionHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+ private static final Set<String> CONSUMER_CONFIG_NAMES =
ConsumerConfig.configNames();
+ private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES =
AdminClientConfig.configNames();
protected final KafkaPartitionLevelStreamConfig _config;
protected final String _clientId;
protected final int _partition;
@@ -82,13 +86,31 @@ public abstract class KafkaPartitionLevelConnectionHandler {
return consumerProp;
}
+ /**
+ * Filter properties to only include the specified Kafka configurations.
+ * This prevents "was supplied but isn't a known config" warnings from Kafka
clients.
+ *
+ * @param props The properties to filter
+ * @param validConfigNames The set of valid configuration names for the
target Kafka client
+ * @return A new Properties object containing only the valid configurations
+ */
+ private Properties filterKafkaProperties(Properties props, Set<String>
validConfigNames) {
+ Properties filteredProps = new Properties();
+ for (String key : props.stringPropertyNames()) {
+ if (validConfigNames.contains(key)) {
+ filteredProps.put(key, props.get(key));
+ }
+ }
+ return filteredProps;
+ }
+
@VisibleForTesting
protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
- return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+ return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp,
CONSUMER_CONFIG_NAMES)), 5);
}
protected AdminClient createAdminClient() {
- return retry(() -> AdminClient.create(_consumerProp), 5);
+ return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp,
ADMIN_CLIENT_CONFIG_NAMES)), 5);
}
private static <T> T retry(Supplier<T> s, int nRetries) {
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
index e24a22c6c6f..ef39b256c7f 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConnectionHandler.java
@@ -23,9 +23,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
public abstract class KafkaPartitionLevelConnectionHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
+ private static final Set<String> CONSUMER_CONFIG_NAMES =
ConsumerConfig.configNames();
+ private static final Set<String> ADMIN_CLIENT_CONFIG_NAMES =
AdminClientConfig.configNames();
protected final KafkaPartitionLevelStreamConfig _config;
protected final String _clientId;
protected final int _partition;
@@ -81,13 +85,31 @@ public abstract class KafkaPartitionLevelConnectionHandler {
return consumerProp;
}
+ /**
+ * Filter properties to only include the specified Kafka configurations.
+ * This prevents "was supplied but isn't a known config" warnings from Kafka
clients.
+ *
+ * @param props The properties to filter
+ * @param validConfigNames The set of valid configuration names for the
target Kafka client
+ * @return A new Properties object containing only the valid configurations
+ */
+ private Properties filterKafkaProperties(Properties props, Set<String>
validConfigNames) {
+ Properties filteredProps = new Properties();
+ for (String key : props.stringPropertyNames()) {
+ if (validConfigNames.contains(key)) {
+ filteredProps.put(key, props.get(key));
+ }
+ }
+ return filteredProps;
+ }
+
@VisibleForTesting
protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
- return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+ return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp,
CONSUMER_CONFIG_NAMES)), 5);
}
protected AdminClient createAdminClient() {
- return retry(() -> AdminClient.create(_consumerProp), 5);
+ return retry(() -> AdminClient.create(filterKafkaProperties(_consumerProp,
ADMIN_CLIENT_CONFIG_NAMES)), 5);
}
private static <T> T retry(Supplier<T> s, int nRetries) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]