chia7712 commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2311409416
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java:
##########
@@ -139,12 +139,12 @@ public abstract class RestServerConfig extends
AbstractConfig {
public static void addPublicConfig(ConfigDef configDef) {
addInternalConfig(configDef);
configDef
- .define(
- REST_EXTENSION_CLASSES_CONFIG,
+ .define(REST_EXTENSION_CLASSES_CONFIG,
ConfigDef.Type.LIST,
- "",
- ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC
- ).define(ADMIN_LISTENERS_CONFIG,
+ List.of(),
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+ ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+ .define(ADMIN_LISTENERS_CONFIG,
ConfigDef.Type.LIST,
null,
new AdminListenersValidator(),
Review Comment:
it seems `AdminListenersValidator` could be replaced by
`ConfigDef.ValidList.anyNonDuplicateValues(true, true)`, right?
##########
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##########
@@ -1006,26 +1006,59 @@ else if (max == null)
public static class ValidList implements Validator {
final ValidString validString;
+ final boolean isEmptyAllowed;
+ final boolean isNullAllowed;
- private ValidList(List<String> validStrings) {
+ private ValidList(List<String> validStrings, boolean isEmptyAllowed,
boolean isNullAllowed) {
this.validString = new ValidString(validStrings);
+ this.isEmptyAllowed = isEmptyAllowed;
+ this.isNullAllowed = isNullAllowed;
+ }
+
+ public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed,
boolean isNullAllowed) {
+ return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
}
public static ValidList in(String... validStrings) {
- return new ValidList(Arrays.asList(validStrings));
+ return new ValidList(List.of(validStrings), true, false);
+ }
+
+ public static ValidList in(boolean isEmptyAllowed, String...
validStrings) {
+ if (!isEmptyAllowed && validStrings.length == 0) {
+ throw new IllegalArgumentException("Valid strings list cannot
be empty for inNonEmpty validator");
Review Comment:
It seems this method was previously named `inNonEmpty`. Could you update the
error message to reflect that?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,25 @@ private int deleteSegments(List<LogSegment> deletable,
SegmentDeletionReason rea
/**
* If topic deletion is enabled, delete any local log segments that have
either expired due to time based
- * retention or because the log size is > retentionSize. Whether or not
deletion is enabled, delete any local
- * log segments that are before the log start offset
+ * retention or because the log size is > retentionSize. Empty
cleanup.policy is the same as delete with
+ * infinite retention, so we only need to delete local segments if remote
storage is enabled. Whether or
+ * not deletion is enabled, delete any local log segments that are before
the log start offset
*/
public int deleteOldSegments() throws IOException {
if (config().delete) {
Review Comment:
```java
if (config().delete)
return deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
if (config().compact) return deleteLogStartOffsetBreachedSegments();
// add documentation for this new behavior
if (remoteLogEnabledAndRemoteCopyEnabled())
return deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
// add documentation for this new behavior
return deleteLogStartOffsetBreachedSegments();
```
##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -63,7 +63,7 @@ public class SocketServerConfigs {
"is assumed if no explicit mapping is provided and no other
security protocol is in use.";
public static final String LISTENERS_CONFIG = "listeners";
- public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+ public static final List<String> LISTENERS_DEFAULT =
List.of("PLAINTEXT://:9092");
Review Comment:
I'm not sure what the rule is for changing the default value from `String`
to `List<String>`. For example, the default value type of `metric.reporters` is
still `String` rather than `List<String>`
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java:
##########
@@ -132,8 +132,7 @@ protected static void
configureSslContextFactoryAlgorithms(SslContextFactory ssl
ssl.setProtocol((String) getOrDefault(sslConfigValues,
SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
List<String> sslCipherSuites = (List<String>)
sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
- if (sslCipherSuites != null)
- ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
+ ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
Review Comment:
ditto on line#126
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -121,16 +122,16 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.writeLock)(fun)
- def listenerListToEndPoints(listeners: String, securityProtocolMap:
java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
+ def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]):
Seq[Endpoint] = {
listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
}
- private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
String): Unit = {
+ private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
java.util.List[String]): Unit = {
val distinctPorts = endpoints.map(_.port).distinct
- require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners:
${listeners.stream().collect(Collectors.joining(","))}")
Review Comment:
Do we really need `listeners.stream().collect(Collectors.joining(","))`? the
default implementation of `toString` should work fine in this case.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java:
##########
@@ -132,8 +132,7 @@ protected static void
configureSslContextFactoryAlgorithms(SslContextFactory ssl
ssl.setProtocol((String) getOrDefault(sslConfigValues,
SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
List<String> sslCipherSuites = (List<String>)
sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
- if (sslCipherSuites != null)
- ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
+ ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
Review Comment:
Should we align the behavior with `DefaultSslEngineFactory` to set it only
if `sslCipherSuites` is not empty?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -716,23 +716,21 @@ public void testInterceptorConstructorClose(GroupProtocol
groupProtocol) {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
groupProtocol) {
- final int targetInterceptor = 3;
+ final int targetInterceptor = 1;
Review Comment:
@m1a2st renaming it does not cover the test case of "remaining instances are
closed", right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]