OmniaGM commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1502,13 +1502,23 @@ public static <K, V> Map<K, V> filterMap(final Map<K,
V> map, final Predicate<En
* @return a map including all elements in properties
*/
public static Map<String, Object> propsToMap(Properties properties) {
- Map<String, Object> map = new HashMap<>(properties.size());
- for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ return castToStringObjectMap(properties);
+ }
+
+ /**
+ * Cast a map with arbitrary type keys to be keyed on String.
+ * @param inputMap A map with unknown type keys
+ * @return A map with the same contents as the input map, but with String
keys
+ * @throws ConfigException if any key is not a String
+ */
+ public static Map<String, Object> castToStringObjectMap(Map<?, ?>
inputMap) {
Review Comment:
There is a similar function like this one in
`AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in
`AbstractConfigTest`
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -925,10 +926,13 @@ static Map<String, Object> adminConfigs(String connName,
// Ignore configs that begin with "admin." since those will be added
next (with the prefix stripped)
// and those that begin with "producer." and "consumer.", since we
know they aren't intended for
// the admin client
+ // Also ignore the config.providers configurations because the
worker-configured ConfigProviders should
+ // already have been evaluated via the trusted WorkerConfig constructor
Map<String, Object> nonPrefixedWorkerConfigs =
config.originals().entrySet().stream()
.filter(e -> !e.getKey().startsWith("admin.")
&& !e.getKey().startsWith("producer.")
- && !e.getKey().startsWith("consumer."))
+ && !e.getKey().startsWith("consumer.")
+ &&
!e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG))
Review Comment:
Small suggestion, as the list of config isn't that huge then I would suggest
refactoring this to something similar to the following
```
Stream<String> prefixes = Stream.of("admin.", "producer.", "consumer.",
AbstractConfig.CONFIG_PROVIDERS_CONFIG);
Map<String, Object> nonPrefixedWorkerConfigs =
config.originals().entrySet().stream()
.filter(e -> prefixes.allMatch(s ->
!e.getKey().startsWith(s)))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
```
##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -580,8 +601,15 @@ private Map<String, ConfigProvider>
instantiateConfigProviders(Map<String, Strin
for (String provider : configProviders.split(",")) {
String providerClass = providerClassProperty(provider);
- if (indirectConfigs.containsKey(providerClass))
- providerMap.put(provider, indirectConfigs.get(providerClass));
+ if (indirectConfigs.containsKey(providerClass)) {
+ String providerClassName = indirectConfigs.get(providerClass);
+ if (classNameFilter.test(providerClassName)) {
+ providerMap.put(provider, providerClassName);
+ } else {
+ throw new ConfigException(providerClassName + " is not
allowed. Update System property '"
+ + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to
allow " + providerClassName);
+ }
+ }
Review Comment:
There is an extra line here need to be removed
##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -580,8 +601,15 @@ private Map<String, ConfigProvider>
instantiateConfigProviders(Map<String, Strin
for (String provider : configProviders.split(",")) {
String providerClass = providerClassProperty(provider);
- if (indirectConfigs.containsKey(providerClass))
- providerMap.put(provider, indirectConfigs.get(providerClass));
+ if (indirectConfigs.containsKey(providerClass)) {
+ String providerClassName = indirectConfigs.get(providerClass);
+ if (classNameFilter.test(providerClassName)) {
Review Comment:
small suggestion here instead of having 2 nested `if`s inside a `for` maybe
we can something like this
```
Optional<String> providerClassName =
Optional.ofNullable(indirectConfigs.get(providerClass));
Boolean isAllowed = providerClassName.map(name ->
classNameFilter.test(name)).orElse(false);
if (isAllowed) {
providerMap.put(provider, providerClassName.get());
} else {
throw new ConfigException(providerClassName + " is not
allowed. Update System property '"
+ AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow
" + providerClassName);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]