mumrah commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1671034456
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource
configResource,
return ApiError.NONE;
}
+ void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion>
records) {
+ List<ConfigRecord> minIsrRecords = new ArrayList<>();
+ Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+ Map<String, String> configRemovedTopicMap = new HashMap<>();
+ records.forEach(record -> {
+ if (MetadataRecordType.fromId(record.message().apiKey()) ==
MetadataRecordType.CONFIG_RECORD) {
+ ConfigRecord configRecord = (ConfigRecord) record.message();
+ if
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ minIsrRecords.add(configRecord);
+ if (Type.forId(configRecord.resourceType()) == Type.TOPIC)
{
+ if (configRecord.value() != null)
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+ else
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+ }
+ }
+ }
+ });
+
+ if (minIsrRecords.isEmpty()) return;
+ if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {
Review Comment:
The size comparison here and below are a little non-obvious (to me at
least). Maybe we can set a boolean as we're looping through the records to
determine if we hit this branch.
Alternative question, is this optimization helping with performance? We
still need the code for the case of overlaying configs from different levels,
so having this separate code path just increases complexity.
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource
configResource,
return ApiError.NONE;
}
+ void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion>
records) {
Review Comment:
The complexity here is a tad bit high. Can we extract a method for getting
the minIsrRecords ConfigRecord from `List<ApiMessageAndVersion>`?
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -426,6 +497,35 @@ public void replay(ConfigRecord record) {
}
}
+ /**
+ * Apply a configuration record to the given config data. Note that, it
will store null for the config to be removed.
+ *
+ * @param record The ConfigRecord.
+ * @param localConfigData The config data is going to be updated.
+ */
+ public void replayForPendingConfig(
Review Comment:
Can we reuse ConfigurationsImage here instead of adding another place where
we are applying records? I think it should be reasonably straightforward to
construct a ConfigurationsImage with the in-memory state (`localConfigData`)
and then replay records to get a ConfigurationsDelta.
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource
configResource,
return ApiError.NONE;
}
+ void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion>
records) {
+ List<ConfigRecord> minIsrRecords = new ArrayList<>();
+ Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+ Map<String, String> configRemovedTopicMap = new HashMap<>();
+ records.forEach(record -> {
+ if (MetadataRecordType.fromId(record.message().apiKey()) ==
MetadataRecordType.CONFIG_RECORD) {
+ ConfigRecord configRecord = (ConfigRecord) record.message();
+ if
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ minIsrRecords.add(configRecord);
+ if (Type.forId(configRecord.resourceType()) == Type.TOPIC)
{
+ if (configRecord.value() != null)
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+ else
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
Review Comment:
style nit: Can you reformat these to not be inline?
##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey
configKey,
translateConfigType(configKey.type()),
configKey.documentation);
}
+
+ /**
+ * OrderedConfigResolver helps to find the configs in the order of the
list config maps.
+ * One thing to notice that, when calling containsKey, if a config
contains a null value entry,
+ * it will return false as null value means the config value should be
ignored.
+ **/
+ public static class OrderedConfigResolver {
+ List<Map<String, ?>> configs;
+ public OrderedConfigResolver(List<Map<String, ?>> maps) {
+ configs = maps;
+ }
+
+ public OrderedConfigResolver(Map<String, ?> map) {
+ configs = new ArrayList<>();
+ configs.add(map);
+ }
+ public boolean containsKey(String key) {
Review Comment:
style nit: whitespace before method signature
##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey
configKey,
translateConfigType(configKey.type()),
configKey.documentation);
}
+
+ /**
+ * OrderedConfigResolver helps to find the configs in the order of the
list config maps.
+ * One thing to notice that, when calling containsKey, if a config
contains a null value entry,
+ * it will return false as null value means the config value should be
ignored.
+ **/
+ public static class OrderedConfigResolver {
+ List<Map<String, ?>> configs;
+ public OrderedConfigResolver(List<Map<String, ?>> maps) {
+ configs = maps;
+ }
+
+ public OrderedConfigResolver(Map<String, ?> map) {
+ configs = new ArrayList<>();
+ configs.add(map);
+ }
+ public boolean containsKey(String key) {
Review Comment:
docs: can you document this method's behavior here?
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource
configResource,
return ApiError.NONE;
}
+ void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion>
records) {
+ List<ConfigRecord> minIsrRecords = new ArrayList<>();
+ Map<String, String> topicToMinIsrValueMap = new HashMap<>();
+ Map<String, String> configRemovedTopicMap = new HashMap<>();
+ records.forEach(record -> {
+ if (MetadataRecordType.fromId(record.message().apiKey()) ==
MetadataRecordType.CONFIG_RECORD) {
+ ConfigRecord configRecord = (ConfigRecord) record.message();
+ if
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ minIsrRecords.add(configRecord);
+ if (Type.forId(configRecord.resourceType()) == Type.TOPIC)
{
+ if (configRecord.value() != null)
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+ else
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+ }
+ }
+ }
+ });
+
+ if (minIsrRecords.isEmpty()) return;
+ if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {
+ // If all the min isr config updates are on the topic level, we
can trigger a simpler update just on the
+ // updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+ new ArrayList<>(topicToMinIsrValueMap.keySet()),
+ topicName -> topicToMinIsrValueMap.get(topicName))
+ );
+ return;
+ }
+
+ // Because it may require multiple layer look up for the min ISR
config value. Build a config data copy and apply
+ // the config updates to it. Use this config copy for the min ISR look
up.
+ Map<ConfigResource, Map<String, String>> pendingConfigData = new
HashMap<>();
+
+ for (ConfigRecord record : minIsrRecords) {
+ replayForPendingConfig(record, pendingConfigData);
+ }
+
+ ArrayList<String> topicList = new ArrayList<>();
+ // If all the updates are on the Topic level, we can avoid perform a
full scan of the partitions.
Review Comment:
Similar to the above comment, this size check is hard to grok. Maybe we can
compute a boolean to determine if we need to inspect every partition vs just a
subset.
In what case do we need to scan all partitions? Only when the cluster-level
`min.insync.replicas` is changed?
##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey
configKey,
translateConfigType(configKey.type()),
configKey.documentation);
}
+
+ /**
+ * OrderedConfigResolver helps to find the configs in the order of the
list config maps.
+ * One thing to notice that, when calling containsKey, if a config
contains a null value entry,
+ * it will return false as null value means the config value should be
ignored.
+ **/
+ public static class OrderedConfigResolver {
+ List<Map<String, ?>> configs;
+ public OrderedConfigResolver(List<Map<String, ?>> maps) {
+ configs = maps;
+ }
+
+ public OrderedConfigResolver(Map<String, ?> map) {
+ configs = new ArrayList<>();
+ configs.add(map);
+ }
+ public boolean containsKey(String key) {
+ for (Map<String, ?> config : configs) {
+ if (config.containsKey(key)) return config.get(key) != null;
Review Comment:
If a later config map has a non-null, I think we could mistakenly return
false here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]