junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2283693210
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java:
##########
@@ -61,11 +61,13 @@ public abstract class HeaderFrom<R extends
ConnectRecord<R>> implements Transfor
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_FIELD, ConfigDef.Type.LIST,
- NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ NO_DEFAULT_VALUE,
+ new NonEmptyListValidator(),
Review Comment:
Should we consolidate on `ConfigDef.ValidList.anyNonDuplicateValues(false,
false)`? Ditto below.
##########
clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java:
##########
@@ -123,8 +124,8 @@ public class SslConfigs {
public static void addClientSslSupport(ConfigDef config) {
config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING,
SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM,
SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING,
null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
- .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW,
SslConfigs.SSL_CIPHER_SUITES_DOC)
- .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS,
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+ .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG,
ConfigDef.Type.LIST, List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true,
false), ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
+ .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS,
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
Review Comment:
It would be useful to document that if this is empty, it will use the
protocols enabled by default in SSLEngine.
##########
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##########
@@ -1006,26 +1006,58 @@ else if (max == null)
public static class ValidList implements Validator {
final ValidString validString;
+ final boolean isEmptyAllowed;
+ final boolean isNullAllowed;
- private ValidList(List<String> validStrings) {
+ private ValidList(List<String> validStrings, boolean isEmptyAllowed,
boolean isNullAllowed) {
this.validString = new ValidString(validStrings);
+ this.isEmptyAllowed = isEmptyAllowed;
+ this.isNullAllowed = isNullAllowed;
+ }
+
+ public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed,
boolean isNullAllowed) {
+ return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
}
public static ValidList in(String... validStrings) {
- return new ValidList(Arrays.asList(validStrings));
+ return new ValidList(List.of(validStrings), true, false);
+ }
+
+ public static ValidList in(boolean isEmptyAllowed, String...
validStrings) {
+ if (validStrings.length == 0) {
Review Comment:
Should we check isEmptyAllowed first?
##########
core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala:
##########
@@ -34,7 +34,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
* classpath and will be instantiated at run-time.
*/
val reporters: Seq[String] =
Csv.parseCsvList(props.getString(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG,
- MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT)).asScala
+ String.join("",
MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT))).asScala
Review Comment:
This works, but is unintuitive. Could we rewrite it? Sth like the following?
```
val reporters: Seq[String] = (if
(props.containsKey(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG))
Csv.parseCsvList(props.getString(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG))
else MetricConfigs.KAFKA_METRIC_REPORTER_CLASSES_DEFAULT).asScala
```
##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -199,14 +199,18 @@ public static List<Endpoint> listenerListToEndPoints(
}
public static List<Endpoint> listenerListToEndPoints(
- String input,
+ List<String> input,
Function<ListenerName, SecurityProtocol> nameToSecurityProto
) {
List<Endpoint> results = new ArrayList<>();
- for (String entry : Csv.parseCsvList(input.trim())) {
- Matcher matcher = URI_PARSE_REGEXP.matcher(entry);
+ for (String entry : input) {
+ String trimEntry = entry.trim();
Review Comment:
Before, we only trim the whole list, not individual items in the list.
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -137,7 +144,8 @@ public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap(
// 2. No SSL or SASL protocols are used in regular listeners
(Note: controller listeners
// are not included in 'listeners' config when
process.roles=broker)
if
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl)
||
-
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+ getList(SocketServerConfigs.LISTENERS_CONFIG).stream()
+ .map(String::trim)
Review Comment:
Is this needed? If so, should we apply to advertised listener too?
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -79,7 +80,13 @@ public AbstractKafkaConfig(ConfigDef definition, Map<?, ?>
originals, Map<String
}
public List<String> logDirs() {
- return
Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)));
+ return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG))
+
.orElse(Arrays.stream(getString(ServerLogConfigs.LOG_DIR_CONFIG).split(","))
Review Comment:
Hmm, LOG_DIR_CONFIG should contain only a single dir. Why do we need to
split it?
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -459,7 +453,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
- CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+
CoreUtils.listenerListToEndPoints(Csv.parseCsvList(advertisedListenersProp),
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
Review Comment:
Should we change advertised.listeners and controller.listener.names to type
List in the KIP too?
##########
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##########
@@ -1006,26 +1006,58 @@ else if (max == null)
public static class ValidList implements Validator {
final ValidString validString;
+ final boolean isEmptyAllowed;
+ final boolean isNullAllowed;
- private ValidList(List<String> validStrings) {
+ private ValidList(List<String> validStrings, boolean isEmptyAllowed,
boolean isNullAllowed) {
this.validString = new ValidString(validStrings);
+ this.isEmptyAllowed = isEmptyAllowed;
+ this.isNullAllowed = isNullAllowed;
+ }
+
+ public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed,
boolean isNullAllowed) {
+ return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
}
public static ValidList in(String... validStrings) {
- return new ValidList(Arrays.asList(validStrings));
+ return new ValidList(List.of(validStrings), true, false);
+ }
+
+ public static ValidList in(boolean isEmptyAllowed, String...
validStrings) {
+ if (validStrings.length == 0) {
+ throw new IllegalArgumentException("Valid strings list cannot
be empty for inNonEmpty validator");
+ }
+ return new ValidList(List.of(validStrings), isEmptyAllowed, false);
}
@Override
public void ensureValid(final String name, final Object value) {
+ if (value == null && isNullAllowed) {
+ return;
+ } else if (value == null) {
+ throw new ConfigException("Configuration '" + name + "' values
must not be null.");
+ }
Review Comment:
The following seems a bit clearer?
```
if (value == null) {
if (isNullAllowed)
return;
else
throw new ConfigException("Configuration '" + name + "'
values must not be null.");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]