junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2294227533
##########
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##########
@@ -260,11 +260,13 @@ abstract class QuorumTestHarness extends Logging {
props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc
+ ":" + sc).mkString(",")
- val listeners = extraControllerSecurityProtocols().map(sc => sc +
"://localhost:0").mkString(",")
- val listenerNames = extraControllerSecurityProtocols().mkString(",")
+ val listeners = extraControllerSecurityProtocols().map(sc => sc +
"://localhost:0").mkString(",").trim
Review Comment:
Why is `trim` needed? Ditto below.
##########
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##########
@@ -159,7 +159,9 @@ public class TopicConfig {
"<a href=\"#compaction\">log compaction</a>, which retains the latest
value for each key. " +
"It is also possible to specify both policies in a comma-separated
list (e.g. \"delete,compact\"). " +
"In this case, old segments will be discarded per the retention time
and size configuration, " +
- "while retained segments will be compacted.";
+ "while retained segments will be compacted." +
+ "An empty list means infinite retention - no cleanup policies will be
applied and log segments " +
+ "will be retained indefinitely.";
Review Comment:
It would be useful to mention that local retention is still enforced with
remote storage enabled. Also, could we update the doc for cleanup.policy in
ServerLogConfigs too?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -561,9 +561,8 @@ public static void
validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo
@SuppressWarnings("unchecked")
private static void
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
List<String> cleanupPolicy = (List<String>)
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
- Set<String> policySet = cleanupPolicy.stream().map(policy ->
policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet());
- if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) {
- throw new ConfigException("Remote log storage only supports topics
with cleanup.policy=delete");
+ if (!cleanupPolicy.isEmpty() && (cleanupPolicy.size() != 1 ||
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0)))) {
+ throw new ConfigException("Remote log storage only supports topics
with cleanup.policy=delete or cleanup.policy is empty list.");
Review Comment:
cleanup.policy is empty list => cleanup.policy being an empty list
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1879,16 +1879,25 @@ private int deleteSegments(List<LogSegment> deletable,
SegmentDeletionReason rea
/**
* If topic deletion is enabled, delete any local log segments that have
either expired due to time based
- * retention or because the log size is > retentionSize. Whether or not
deletion is enabled, delete any local
+ * retention or because the log size is > retentionSize. Empty
cleanup.policy with remote storage enabled
+ * behaves the same as deletion policy. Whether or not deletion is
enabled, delete any local
Review Comment:
Empty cleanup.policy with remote storage enabled behaves the same as
deletion policy => Empty cleanup.policy is the same as delete with infinite
retention. So, we only need to delete local segments if remote storage is
enabled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]