gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1585294415
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String
connName,
ConnectorClientConfigRequest connectorClientConfigRequest = new
ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs,
clientType);
List<ConfigValue> configValues =
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
- if (configValues != null) {
- for (ConfigValue validatedConfigValue : configValues) {
- ConfigKey configKey =
configKeys.get(validatedConfigValue.name());
- ConfigKeyInfo configKeyInfo = null;
- if (configKey != null) {
- if (configKey.group != null) {
- groups.add(configKey.group);
- }
- configKeyInfo = convertConfigKey(configKey, prefix);
- }
- ConfigValue configValue = new ConfigValue(prefix +
validatedConfigValue.name(), validatedConfigValue.value(),
-
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
- if (!configValue.errorMessages().isEmpty()) {
- errorCount++;
+ return prefixedConfigInfos(configDef.configKeys(), configValues,
prefix);
+ }
+
+ private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey>
configKeys, List<ConfigValue> configValues, String prefix) {
+ int errorCount = 0;
+ Set<String> groups = new LinkedHashSet<>();
+ List<ConfigInfo> configInfos = new ArrayList<>();
+
+ if (configValues == null) {
Review Comment:
I think this null check is only relevant when the value is coming from the
overridePolicy.validate, in validateConverterConfig, I think the
ConfigDef#validate call will always be non-null.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ log.error("Failed to instantiate {} class {}; this should have
been caught by prior validation logic", pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load class " +
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+
+ try {
+ ConfigDef configDef;
+ try {
Review Comment:
Do you want to add a stage for this call into the plugin?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ log.error("Failed to instantiate {} class {}; this should have
been caught by prior validation logic", pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load class " +
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
Review Comment:
I think mutating the passed-in value is correct here, but it's not clear
from the signature that this happens.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
Review Comment:
This initializes the plugin with the wrong classloader (and I guess applies
the configDefAccessor with the wrong classloader too).
There's already Plugins#newPlugin(String) which you could use, but it
returns Object. Perhaps we could add a Class<T> argument to make the type
signature better.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
Review Comment:
This is where the connector validation used the tempConnectors cache to
re-use the connector objects.
Personally i'm fine with per-call instantiation, but thought it would be
worth mentioning.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -562,8 +709,13 @@ ConfigInfos validateConnectorConfig(
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
- ConfigInfos configInfos = generateResult(connType, configKeys,
configValues, new ArrayList<>(allGroups));
+ // do custom converter-specific validation
+ ConfigInfos headerConverterConfigInfos =
validateHeaderConverterConfig(connectorProps,
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG));
+ ConfigInfos keyConverterConfigInfos =
validateKeyConverterConfig(connectorProps,
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG));
+ ConfigInfos valueConverterConfigInfos =
validateValueConverterConfig(connectorProps,
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG));
Review Comment:
nit: these constants are provided here, and also hard-coded inside these
functions
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -16,6 +16,15 @@
*/
package org.apache.kafka.common.utils;
+import java.lang.reflect.Modifier;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteOrder;
+import java.nio.file.StandardOpenOption;
+import java.util.AbstractMap;
+import java.util.EnumSet;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
Review Comment:
nit: merge conflict?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ log.error("Failed to instantiate {} class {}; this should have
been caught by prior validation logic", pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load class " +
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+
+ try {
+ ConfigDef configDef;
+ try {
+ configDef = configDefAccessor.apply(pluginInstance);
+ } catch (RuntimeException e) {
+ log.error("Failed to load ConfigDef from {} of type {}",
pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load ConfigDef
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+ if (configDef == null) {
+ log.warn("{}.config() has returned a null ConfigDef; no
further preflight config validation for this converter will be performed",
pluginClass);
+ // Older versions of Connect didn't do any converter
validation.
+ // Even though converters are technically required to return a
non-null ConfigDef object from their config() method,
+ // we permit this case in order to avoid breaking existing
converters that, despite not adhering to this requirement,
+ // can be used successfully with a connector.
+ return null;
+ }
+ final String pluginPrefix = pluginProperty + ".";
+ Map<String, String> pluginConfig =
connectorConfig.entrySet().stream()
Review Comment:
nit: Utils#entriesWithPrefix ?
This would mirror the logic used in AbstractConfig#originalsWithPrefix which
is what is used during normal instantiation.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ log.error("Failed to instantiate {} class {}; this should have
been caught by prior validation logic", pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load class " +
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+
+ try {
+ ConfigDef configDef;
+ try {
+ configDef = configDefAccessor.apply(pluginInstance);
+ } catch (RuntimeException e) {
Review Comment:
```suggestion
} catch (Exception e) {
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
+ log.error("Failed to instantiate {} class {}; this should have
been caught by prior validation logic", pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load class " +
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+
+ try {
+ ConfigDef configDef;
+ try {
+ configDef = configDefAccessor.apply(pluginInstance);
+ } catch (RuntimeException e) {
+ log.error("Failed to load ConfigDef from {} of type {}",
pluginName, pluginClass, e);
+ pluginConfigValue.addErrorMessage("Failed to load ConfigDef
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+ return null;
+ }
+ if (configDef == null) {
+ log.warn("{}.config() has returned a null ConfigDef; no
further preflight config validation for this converter will be performed",
pluginClass);
+ // Older versions of Connect didn't do any converter
validation.
+ // Even though converters are technically required to return a
non-null ConfigDef object from their config() method,
+ // we permit this case in order to avoid breaking existing
converters that, despite not adhering to this requirement,
+ // can be used successfully with a connector.
+ return null;
+ }
+ final String pluginPrefix = pluginProperty + ".";
+ Map<String, String> pluginConfig =
connectorConfig.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(pluginPrefix))
+ .collect(Collectors.toMap(
+ e -> e.getKey().substring(pluginPrefix.length()),
+ Map.Entry::getValue
+ ));
+ if (defaultProperties != null)
+ defaultProperties.forEach(pluginConfig::putIfAbsent);
+
+ List<ConfigValue> configValues;
+ try {
+ configValues = configDef.validate(pluginConfig);
+ } catch (RuntimeException e) {
Review Comment:
```suggestion
} catch (Exception e) {
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue>
validateSourceConnectorConfig(SourceConnector
return configDef.validateAll(config);
}
+ /**
+ * General-purpose validation logic for converters that are configured
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter
property in the connector config;
+ * may be null, in which case no validation will
be performed under the assumption that the
+ * connector will use inherit the converter
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ * (e.g., {@code
org.apache.kafka.connect.storage.Converter.class});
+ * may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a
{@link ConfigDef}
+ * from an instance of the plugin type (e.g.,
{@code Converter::config});
+ * may not be null
+ * @param pluginName a lowercase, human-readable name for the type of
plugin (e.g., {@code "key converter"});
+ * may not be null
+ * @param pluginProperty the property used to define a custom class for
the plugin type
+ * in a connector config (e.g., {@link
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ * may not be null
+ * @param defaultProperties any default properties to include in the
configuration that will be used for
+ * the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no
custom plugin was defined in the connector
+ * config)
+
+ * @param <T> the plugin class to perform validation for
+ */
+ private <T> ConfigInfos validateConverterConfig(
+ Map<String, String> connectorConfig,
+ ConfigValue pluginConfigValue,
+ Class<T> pluginInterface,
+ Function<T, ConfigDef> configDefAccessor,
+ String pluginName,
+ String pluginProperty,
+ Map<String, String> defaultProperties
+ ) {
+ Objects.requireNonNull(connectorConfig);
+ Objects.requireNonNull(pluginInterface);
+ Objects.requireNonNull(configDefAccessor);
+ Objects.requireNonNull(pluginName);
+ Objects.requireNonNull(pluginProperty);
+
+ String pluginClass = connectorConfig.get(pluginProperty);
+
+ if (pluginClass == null
+ || pluginConfigValue == null
+ || !pluginConfigValue.errorMessages().isEmpty()
+ ) {
+ // Either no custom converter was specified, or one was specified
but there's a problem with it.
+ // No need to proceed any further.
+ return null;
+ }
+
+ T pluginInstance;
+ try {
+ pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+ } catch (ClassNotFoundException | RuntimeException e) {
Review Comment:
```suggestion
} catch (Exception e) {
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String
connName,
ConnectorClientConfigRequest connectorClientConfigRequest = new
ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs,
clientType);
List<ConfigValue> configValues =
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
- if (configValues != null) {
- for (ConfigValue validatedConfigValue : configValues) {
- ConfigKey configKey =
configKeys.get(validatedConfigValue.name());
- ConfigKeyInfo configKeyInfo = null;
- if (configKey != null) {
- if (configKey.group != null) {
- groups.add(configKey.group);
- }
- configKeyInfo = convertConfigKey(configKey, prefix);
- }
- ConfigValue configValue = new ConfigValue(prefix +
validatedConfigValue.name(), validatedConfigValue.value(),
-
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
- if (!configValue.errorMessages().isEmpty()) {
- errorCount++;
+ return prefixedConfigInfos(configDef.configKeys(), configValues,
prefix);
+ }
+
+ private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey>
configKeys, List<ConfigValue> configValues, String prefix) {
+ int errorCount = 0;
+ Set<String> groups = new LinkedHashSet<>();
+ List<ConfigInfo> configInfos = new ArrayList<>();
+
+ if (configValues == null) {
+ return new ConfigInfos("", 0, new ArrayList<>(groups),
configInfos);
+ }
+
+ for (ConfigValue validatedConfigValue : configValues) {
+ ConfigKey configKey = configKeys.get(validatedConfigValue.name());
+ ConfigKeyInfo configKeyInfo = null;
+ if (configKey != null) {
+ if (configKey.group != null) {
+ groups.add(configKey.group);
}
- ConfigValueInfo configValueInfo =
convertConfigValue(configValue, configKey != null ? configKey.type : null);
- configInfoList.add(new ConfigInfo(configKeyInfo,
configValueInfo));
+ configKeyInfo = convertConfigKey(configKey, prefix);
+ }
+
+ ConfigValue configValue = new ConfigValue(prefix +
validatedConfigValue.name(), validatedConfigValue.value(),
+ validatedConfigValue.recommendedValues(),
validatedConfigValue.errorMessages());
+ if (configValue.errorMessages().size() > 0) {
+ errorCount++;
}
+ ConfigValueInfo configValueInfo = convertConfigValue(configValue,
configKey != null ? configKey.type : null);
+ configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
}
- return new ConfigInfos(connectorClass.toString(), errorCount, new
ArrayList<>(groups), configInfoList);
+ return new ConfigInfos("", errorCount, new ArrayList<>(groups),
configInfos);
Review Comment:
nit: i don't like this empty string, but i see that it has no overall
effect. I wonder if the validator can just return List<ConfigInfo> and compute
errorCount/groups at the end.
This doesn't have to be done in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]