chia7712 commented on code in PR #19371:
URL: https://github.com/apache/kafka/pull/19371#discussion_r2105869770
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java:
##########
@@ -198,7 +197,7 @@ public void testUpdateConfig() {
assertEquals(oldConfig, log.config());
Properties props = new Properties();
- props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1);
+ props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
oldConfig.segmentSize() + 1);
Review Comment:
it can keep using `TopicConfig.SEGMENT_BYTES_CONFIG`, right?
##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -95,29 +95,29 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
@Test
def testDynamicTopicConfigChange(): Unit = {
val tp = new TopicPartition("test", 0)
- val oldSegmentSize = 1000
+ val oldSegmentSize = 2 * 1024 * 1024
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldSegmentSize.toString)
createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.brokers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
- assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
+ assertEquals(oldSegmentSize, logOpt.get.config.segmentSize())
}
val newSegmentSize = 2000
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
- val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
+ val op = new AlterConfigOp(new
ConfigEntry(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
Review Comment:
this test is used to verify the alter, so you can use larger
`newSegmentSize` to keep using `TopicConfig.SEGMENT_BYTES_CONFIG`
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -248,7 +247,7 @@ public void
consumerConfigMustContainStreamPartitionAssignorConfig() {
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L);
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
7L);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
- props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
100);
+ props.put(StreamsConfig.topicPrefix("internal.segment.bytes"), 100);
Review Comment:
this is used to test the updated configs get returned, so we can increase
the configured value instead of using internal config.
##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest {
}
dir
}
+
+ private def createMetadataLogConfig(
+ internalLogSegmentBytes: Int,
+ logSegmentMillis: Long,
+ retentionMaxBytes: Long,
+ retentionMillis: Long,
+ internalMaxBatchSizeInBytes: Int,
+ internalMaxFetchSizeInBytes: Int,
+ internalDeleteDelayMillis: Long
+ ): MetadataLogConfig = {
+ val config: util.Map[String, Any] = util.Map.of(
Review Comment:
`val config = util.Map.of(`
##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest {
}
dir
}
+
+ private def createMetadataLogConfig(
+ internalLogSegmentBytes: Int,
+ logSegmentMillis: Long,
+ retentionMaxBytes: Long,
+ retentionMillis: Long,
+ internalMaxBatchSizeInBytes: Int,
Review Comment:
Could you please add default value to `internalMaxBatchSizeInBytes`,
`internalMaxFetchSizeInBytes`, and `internalDeleteDelayMillis` to streamline
the callers?
##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -166,6 +166,10 @@ public Map<String, ConfigEntry>
resolveEffectiveTopicConfigs(
ConfigDef configDef =
configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF);
HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>();
for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
+ // This config is internal; if the user hasn't set it explicitly,
it should not be returned.
+ if (configKey.internalConfig &&
!dynamicTopicConfigs.containsKey(configKey.name)) {
Review Comment:
Have you added test for this change?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -446,6 +447,14 @@ public static List<String> configNames() {
return CONFIG.names().stream().sorted().toList();
}
+ public static List<String> nonInternalConfigNames() {
+ return CONFIG.configKeys().entrySet()
+ .stream()
+ .filter(entry -> !entry.getValue().internalConfig)
+ .map(Map.Entry::getKey)
+ .sorted().toList();
+ }
+
public static Optional<String> serverConfigName(String configName) {
Review Comment:
it is unused now. please remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]