junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2291889455
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1887,8 +1887,10 @@ public int deleteOldSegments() throws IOException {
return deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
- } else {
+ } else if (config().compact) {
return deleteLogStartOffsetBreachedSegments();
+ } else {
+ return deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments();
Review Comment:
Well, an empty cleanup.policy is the same as delete with retention time and
retention size set to -1, which tiered storage supports. So, empty
cleanup.policy should be supported by tiered storage. In the KIP, we have the
following.
`If cleanup.policy is empty and remote.storage.enable is set to true, the
local log segments will be cleaned based on the values of
log.local.retention.bytes and log.local.retention.ms.`
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -79,7 +80,11 @@ public AbstractKafkaConfig(ConfigDef definition, Map<?, ?>
originals, Map<String
}
public List<String> logDirs() {
- return
Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)));
+ return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG))
+
.orElse(Arrays.stream(getString(ServerLogConfigs.LOG_DIR_CONFIG).split(",")).toList())
Review Comment:
Should we just define LOG_DIR_CONFIG as a List then? If so, we need to
update the KIP.
##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -155,8 +155,8 @@ public class SocketServerConfigs {
public static final String NUM_NETWORK_THREADS_DOC = "The number of
threads that the server uses for receiving requests from the network and
sending responses to the network. Noted: each listener (except for controller
listener) creates its own thread pool.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH,
LISTENERS_DOC)
- .define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH,
ADVERTISED_LISTENERS_DOC)
+ .define(LISTENERS_CONFIG, LIST, LISTENERS_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, LISTENERS_DOC)
+ .define(ADVERTISED_LISTENERS_CONFIG, LIST, null, HIGH,
ADVERTISED_LISTENERS_DOC)
Review Comment:
Should we validate that it can't be empty?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -561,8 +561,7 @@ public static void
validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo
@SuppressWarnings("unchecked")
private static void
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
List<String> cleanupPolicy = (List<String>)
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
- Set<String> policySet = cleanupPolicy.stream().map(policy ->
policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet());
- if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) {
+ if (cleanupPolicy.size() != 1 ||
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0))) {
Review Comment:
We need to change the logic to accommodate for empty cleanupPolicy with
remote storage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]