Jackie-Jiang commented on code in PR #16965:
URL: https://github.com/apache/pinot/pull/16965#discussion_r2411565986
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -82,13 +84,31 @@ private Properties buildProperties(StreamConfig
streamConfig) {
return consumerProp;
}
+ /**
+ * Filter properties to only include the specified Kafka configurations.
+ * This prevents "was supplied but isn't a known config" warnings from Kafka
clients.
+ *
+ * @param props The properties to filter
+ * @param validConfigNames The set of valid configuration names for the
target Kafka client
+ * @return A new Properties object containing only the valid configurations
+ */
+ private Properties filterKafkaProperties(Properties props, Set<String>
validConfigNames) {
+ Properties filteredProps = new Properties();
+ for (String key : props.stringPropertyNames()) {
+ if (validConfigNames.contains(key)) {
+ filteredProps.put(key, props.get(key));
+ }
+ }
+ return filteredProps;
+ }
+
@VisibleForTesting
protected Consumer<String, Bytes> createConsumer(Properties consumerProp) {
- return retry(() -> new KafkaConsumer<>(consumerProp), 5);
+ return retry(() -> new KafkaConsumer<>(filterKafkaProperties(consumerProp,
ConsumerConfig.configNames())), 5);
Review Comment:
Seems like `ConsumerConfig.configNames()` will create a new set every time
being invoked. Can we cache it in a constant in the class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]