junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2326299192
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,29 @@ private int deleteSegments(List<LogSegment> deletable,
SegmentDeletionReason rea
/**
* If topic deletion is enabled, delete any local log segments that have
either expired due to time based
- * retention or because the log size is > retentionSize. Whether or not
deletion is enabled, delete any local
- * log segments that are before the log start offset
+ * retention or because the log size is > retentionSize. Empty
cleanup.policy is the same as delete with
+ * infinite retention, so we only need to delete local segments if remote
storage is enabled. Whether or
+ * not deletion is enabled, delete any local log segments that are before
the log start offset
*/
public int deleteOldSegments() throws IOException {
if (config().delete) {
return deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
- } else {
+ } else if (config().compact) {
return deleteLogStartOffsetBreachedSegments();
+ } else {
+ // If cleanup.policy is empty and remote storage is enabled, the
local log segments will
+ // be cleaned based on the values of log.local.retention.bytes and
log.local.retention.ms
+ if (remoteLogEnabledAndRemoteCopyEnabled()) {
+ return deleteLogStartOffsetBreachedSegments() +
+ deleteRetentionSizeBreachedSegments() +
+ deleteRetentionMsBreachedSegments();
+ } else {
+ // If cleanup.policy is empty and remote storage is disabled,
we should not delete any local
+ // log segments
Review Comment:
> we should not delete any local log segments
This is a bit inaccurate. We probably want to say "we should not delete any
local log segments unless the log start offset advances through deleteRecords".
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -716,26 +716,27 @@ public void testInterceptorConstructorClose(GroupProtocol
groupProtocol) {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
groupProtocol) {
- final int targetInterceptor = 3;
+ final int targetInterceptor = 1;
try {
Properties props = new Properties();
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
- props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName() + ", "
- + MockConsumerInterceptor.class.getName() + ", "
- + MockConsumerInterceptor.class.getName());
+ props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ CloseInterceptor.class.getName() + "," +
MockConsumerInterceptor.class.getName());
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
assertThrows(KafkaException.class, () -> newConsumer(
props, new StringDeserializer(), new
StringDeserializer()));
- assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
- assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+ assertEquals(1, MockConsumerInterceptor.CONFIG_COUNT.get());
+ assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
+ assertEquals(1, CloseInterceptor.CLOSE_COUNT.get());
Review Comment:
This is an existing issue. So, the interceptors are called in reverse
ordering?
##########
docs/upgrade.html:
##########
@@ -113,6 +113,35 @@ <h5><a id="upgrade_420_notable"
href="#upgrade_420_notable">Notable changes in 4
<li>
The <code>num.replica.fetchers</code> config has a new lower bound of
1.
</li>
+ <li>
+ Improvements have been made to the validation rules and default values
of LIST-type configurations
+ (<a href="https://cwiki.apache.org/confluence/x/HArXF">KIP-1161</a>).
+ <ul>
+ <li>
+ LIST-type configurations now enforce stricter validation:
+ <ul>
+ <li>Null values are no longer accepted for most LIST-type
configurations, except those that explicitly
+ allow a null default value or where a null value has a
well-defined semantic meaning.</li>
+ <li>Duplicate entries within the same list are no longer
permitted.</li>
+ <li>Empty lists are no longer allowed, except in
configurations where an empty list has a well-defined
+ semantic meaning.</li>
+ </ul>
+ </li>
+ <li>
+ Several configurations have been reclassified from STRING-type
to LIST-type to better reflect their
+ intended use as comma-separated values.
+ </li>
+ <li>
+ Default values for certain configurations have been adjusted
to ensure better consistency with related
+ settings.
+ </li>
+ <li>
+ The <code>cleanup.policy</code> is empty and
<code>remote.storage.enable</code> is set to true, the
Review Comment:
The <code>cleanup.policy</code> is empty => If <code>cleanup.policy</code>
is empty
Also, could we also add "cleanup.policy supports empty, which means infinite
retention."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]