junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2304758593
##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -502,7 +508,7 @@ public class ProducerConfig extends AbstractConfig {
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
- new ConfigDef.NonNullValidator(),
+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Review Comment:
This is an existing issue. Should we change Collections.emptyList() to
List.Of() above?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -615,7 +615,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
Collections.emptyList(),
- new ConfigDef.NonNullValidator(),
+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Review Comment:
This is an existing issue. Should we change Collections.emptyList() to
List.Of() above?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -246,15 +247,19 @@ protected static ConfigDef baseConfigDef() {
Importance.LOW,
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
- JmxReporter.class.getName(), Importance.LOW,
+ JmxReporter.class.getName(),
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+ Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
.define(HEADER_CONVERTER_VERSION, Type.STRING,
HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW,
HEADER_CONVERTER_VERSION_DOC)
- .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
+ .define(CONFIG_PROVIDERS_CONFIG,
+ Type.LIST,
Collections.emptyList(),
+ ConfigDef.ValidList.anyNonDuplicateValues(true, false),
Review Comment:
This is an existing issue. Should we change Collections.emptyList() to
List.Of() above?
##########
server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:
##########
@@ -36,8 +38,8 @@ public class ServerLogConfigs {
public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
- public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
- public static final String LOG_DIR_DOC = "The directory in which the log
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+ public static final List<String> LOG_DIR_DEFAULT =
List.of("/tmp/kafka-logs");
+ public static final String LOG_DIR_DOC = "The directories in which the log
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
Review Comment:
Perhaps we could say
`A comma-separated list of the directories where the log data is stored.
Synonym to LOG_DIRS_CONFIG`
##########
core/src/test/scala/unit/kafka/KafkaConfigTest.scala:
##########
@@ -90,15 +94,34 @@ class KafkaConfigTest {
"requirement failed: The listeners config must only contain KRaft
controller listeners from controller.listener.names when
process.roles=controller")
properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
- assertBadConfigContainingMessage(properties,
- "No security protocol defined for listener CONTROLLER")
+ properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT")
+ KafkaConfig.fromProps(properties)
+ }
+ @Test
+ def testControllerListenerNamesMismatch(): Unit = {
+ val properties = new Properties()
+ properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+ properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
+ properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "OTHER")
+ properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
+ properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT")
+
assertBadConfigContainingMessage(properties,
"requirement failed: The listeners config must only contain KRaft
controller listeners from controller.listener.names when
process.roles=controller")
+ }
- properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
- KafkaConfig.fromProps(properties)
+ @Test
+ def testControllerSecurityProtocolMissing(): Unit = {
Review Comment:
testControllerSecurityProtocolMissing =>
testControllerSecurityProtocolMapMissing
##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java:
##########
@@ -61,11 +61,13 @@ public abstract class HeaderFrom<R extends
ConnectRecord<R>> implements Transfor
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_FIELD, ConfigDef.Type.LIST,
- NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ NO_DEFAULT_VALUE,
+ new NonEmptyListValidator(),
ConfigDef.Importance.HIGH,
"Field names in the record whose values are to be copied
or moved to headers.")
.define(HEADERS_FIELD, ConfigDef.Type.LIST,
- NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+ NO_DEFAULT_VALUE,
+ new NonEmptyListValidator(),
Review Comment:
Changes in this file are not needed.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -713,32 +713,6 @@ public void testInterceptorConstructorClose(GroupProtocol
groupProtocol) {
}
}
- @ParameterizedTest
- @EnumSource(GroupProtocol.class)
- public void
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
groupProtocol) {
Review Comment:
Why is this test removed?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -144,8 +144,8 @@ public Optional<String> serverConfigName(String configName)
{
public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef()
.define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT,
ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM,
ServerLogConfigs.NUM_PARTITIONS_DOC)
- .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING,
ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
- .define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH,
ServerLogConfigs.LOG_DIRS_DOC)
+ .define(ServerLogConfigs.LOG_DIR_CONFIG, LIST,
ServerLogConfigs.LOG_DIR_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH,
ServerLogConfigs.LOG_DIR_DOC)
Review Comment:
This shouldn't be nullable, right?
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -137,7 +144,8 @@ public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap(
// 2. No SSL or SASL protocols are used in regular listeners
(Note: controller listeners
// are not included in 'listeners' config when
process.roles=broker)
if
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl)
||
-
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+ getList(SocketServerConfigs.LISTENERS_CONFIG).stream()
+ .map(String::trim)
Review Comment:
Do we need to call `trim()` here?
##########
server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java:
##########
@@ -70,12 +70,12 @@ public class KRaftConfigs {
public static final String
CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error
message about controller events that take longer than this threshold.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
+ .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.ValidList.in(false, "broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE,
atLeast(0), HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT,
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM,
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
.define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM,
BROKER_HEARTBEAT_INTERVAL_MS_DOC)
.define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT,
BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)
- .define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null,
HIGH, CONTROLLER_LISTENER_NAMES_DOC)
+ .define(CONTROLLER_LISTENER_NAMES_CONFIG, LIST,
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.anyNonDuplicateValues(false,
false), HIGH, CONTROLLER_LISTENER_NAMES_DOC)
Review Comment:
This is an existing issue. Under the broker config doc in the website, we
need to include it as the essential config.
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3187,6 +3187,55 @@ class UnifiedLogTest {
assertEquals(segments, log.numberOfSegments, "There should be 3 segments
remaining")
}
+ @Test
+ def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit =
{
+ def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
+ val recordSize = createRecords.sizeInBytes
+ val logConfig = LogTestUtils.createLogConfig(
+ segmentBytes = recordSize * 2,
+ localRetentionBytes = recordSize / 2,
+ cleanupPolicy = "",
+ remoteLogStorageEnable = true
+ )
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+ for (_ <- 0 until 10)
+ log.appendAsLeader(createRecords, 0)
+
+ val segmentsBefore = log.numberOfSegments
+ log.updateHighWatermark(log.logEndOffset)
+ log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
+ val deleteOldSegments = log.deleteOldSegments()
+
+ assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be
deleted due to size retention")
+ assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
+ }
+
+ @Test
+ def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = {
+ def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
+ val recordSize = createRecords.sizeInBytes
+ val logConfig = LogTestUtils.createLogConfig(
+ segmentBytes = recordSize * 2,
+ localRetentionMs = 10000,
+ cleanupPolicy = "",
+ remoteLogStorageEnable = true
+ )
+ val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+ for (_ <- 0 until 10)
+ log.appendAsLeader(createRecords, 0)
+
+ // mark the oldest segment as older the retention.ms
+ log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
Review Comment:
Hmm, why do we need this? The time based retention should be based on the
timestamp in the records, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]