malletgu commented on code in PR #16681:
URL: https://github.com/apache/kafka/pull/16681#discussion_r1724975609
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionE
long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
- Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
- log.onlyLocalLogSegmentsSize(), logEndOffset,
epochWithOffsets);
- Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
-
- RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
- Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
- boolean canProcess = true;
- List<RemoteLogSegmentMetadata> segmentsToDelete = new
ArrayList<>();
long sizeOfDeletableSegmentsBytes = 0L;
- while (canProcess && epochIterator.hasNext()) {
- Integer epoch = epochIterator.next();
- Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
- while (canProcess && segmentsIterator.hasNext()) {
- if (isCancelled()) {
- logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
- return;
- }
- RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
- if
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
- logger.debug("Copy for the segment {} is currently in
process. Skipping cleanup for it and the remaining segments",
- metadata.remoteLogSegmentId());
- canProcess = false;
- continue;
- }
- if
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+ final List<RemoteLogSegmentMetadata> segmentsToDelete = new
ArrayList<>();
+ final List<RemoteLogSegmentMetadata> validSegments = new
ArrayList<>();
+ for (Integer remoteLeaderEpoch:
epochWithOffsets.navigableKeySet()) {
+ Iterator<RemoteLogSegmentMetadata> it =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition,
remoteLeaderEpoch);
+ while (it.hasNext()) {
+ final RemoteLogSegmentMetadata segment = it.next();
+
+ // We can remove all segments in COPY_SEGMENT_STARTED but
the last one as they are dangling
+ if
(segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) &&
it.hasNext()) {
+ sizeOfDeletableSegmentsBytes +=
segment.segmentSizeInBytes();
+ segmentsToDelete.add(segment);
continue;
}
- if (segmentsToDelete.contains(metadata)) {
+
+ if
(segment.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
+ segmentsToDelete.add(segment);
+ sizeOfDeletableSegmentsBytes +=
segment.segmentSizeInBytes();
continue;
}
Review Comment:
Yes this is correct. Avoiding passing those segments when computing
`buildRetentionSizeData` after is the most important thing and before we used
to do it.
> We will check COPY_SEGMENT_STARTED and DELETE_SEGMENT_STARTED here, but
DELETE_SEGMENT_FINISHED in L1244. I don't think it's readable. Could you make
it clear?
I'll refactor this
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionE
long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
- Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
- log.onlyLocalLogSegmentsSize(), logEndOffset,
epochWithOffsets);
- Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
-
- RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
- Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
- boolean canProcess = true;
- List<RemoteLogSegmentMetadata> segmentsToDelete = new
ArrayList<>();
long sizeOfDeletableSegmentsBytes = 0L;
- while (canProcess && epochIterator.hasNext()) {
- Integer epoch = epochIterator.next();
- Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
- while (canProcess && segmentsIterator.hasNext()) {
- if (isCancelled()) {
- logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
- return;
- }
- RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
- if
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
- logger.debug("Copy for the segment {} is currently in
process. Skipping cleanup for it and the remaining segments",
- metadata.remoteLogSegmentId());
- canProcess = false;
- continue;
- }
- if
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
+ final List<RemoteLogSegmentMetadata> segmentsToDelete = new
ArrayList<>();
+ final List<RemoteLogSegmentMetadata> validSegments = new
ArrayList<>();
+ for (Integer remoteLeaderEpoch:
epochWithOffsets.navigableKeySet()) {
+ Iterator<RemoteLogSegmentMetadata> it =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition,
remoteLeaderEpoch);
+ while (it.hasNext()) {
+ final RemoteLogSegmentMetadata segment = it.next();
+
+ // We can remove all segments in COPY_SEGMENT_STARTED but
the last one as they are dangling
+ if
(segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) &&
it.hasNext()) {
Review Comment:
Correct, it is based on each `RemoteLogSegmentMetadata` being different.
> if there is only COPY_SEGMENT_STARTED and has no next state, it must be a
dangling one, right
We used to have this assumption, but now that the copy and cleanup happens
in parallel we could get a segment in `COPY_SEGMENT_STARTED` because it is
actually being copied.
This is also why I avoid doing
`remoteLogMetadataManager.listRemoteLogSegments` again as this list of segments
could change between consecutive calls during this function.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {
+ private static final Random RANDOM = new Random(0);
+
+ private KafkaConfig config;
+ private final String clusterId = "dummyId";
+ private final int retentionSize = 10_000_000;
+ private final File logDir = TestUtils.tempDirectory("kafka-");
+ private final File partitionDir =
kafka.utils.TestUtils.randomPartitionLogDir(logDir);
+ private BrokerTopicStats brokerTopicStats;
+ private final MockTime mockTime = new MockTime();
+ private Uuid topicId;
+ private UnifiedLog log;
+ private final Metrics metrics = new Metrics();
+
+ private final RemoteStorageManager remoteStorageManager =
Mockito.mock(RemoteStorageManager.class);
+ private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager =
new InMemoryRemoteLogMetadataManager();
+ private final RLMQuotaManager rlmCopyQuotaManager =
mock(RLMQuotaManager.class);
+ private RemoteLogManager remoteLogManager;
+
+ private final String remoteLogStorageTestProp = "remote.log.storage.test";
+ private final String remoteLogStorageTestVal = "storage.test";
+
+ private TopicIdPartition topicIdPartition;
+
+
+ @BeforeEach
+ void setUp() throws Exception {
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true");
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
"100");
+ appendRLMConfig(props);
+ config = KafkaConfig.fromProps(props);
+ topicId = Uuid.randomUuid();
+ topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition("test", 0));
+ brokerTopicStats = new
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
+
+ HashMap<String, Object> properties = new HashMap<>();
+ properties.put("retention.bytes", retentionSize);
+ properties.put("local.retention.bytes", "1");
+ properties.put("remote.storage.enable", "true");
+
+ LogConfig logConfig = new LogConfig(properties);
+ log = LogTestUtils.createLog(
+ partitionDir, logConfig, brokerTopicStats, mockTime.scheduler,
mockTime, 0L, 0L, 5 * 60 * 1000,
+ new ProducerStateManagerConfig(86400000, false), 600000, true,
+ Option.apply(topicId), true, new ConcurrentHashMap<>(), true,
null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+ );
+
+ remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(),
clusterId, mockTime,
+ tp -> Optional.of(log),
+ (topicPartition, offset) ->
log.maybeIncrementLogStartOffset(offset,
LogStartOffsetIncrementReason.SegmentDeletion),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+
+ public RLMQuotaManager createRLMCopyQuotaManager() {
+ return rlmCopyQuotaManager;
+ }
+
+ public Duration quotaTimeout() {
+ return Duration.ofMillis(100);
+ }
+
+ @Override
+ long findLogStartOffset(TopicIdPartition topicIdPartition,
UnifiedLog log) {
+ return 0L;
+ }
+ };
+ remoteLogMetadataManager.initialise(topicIdPartition);
+ remoteLogManager.startup();
+ }
+
+ @AfterEach
+ void cleanup() {
+ log.close();
+ }
+
+ private void appendRLMConfig(Properties props) {
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
+
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
NoOpRemoteStorageManager.class.getName());
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
NoOpRemoteLogMetadataManager.class.getName());
+ props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX +
remoteLogStorageTestProp, remoteLogStorageTestVal);
+ }
+
+ /**
+ * Builds a segment of the given size and append it to the log, then roll
the segment after.
+ * Each segment has some overhead of around 70bytes which impact log
retention policy.
+ *
+ * @param size size of the segment to create
+ * @param count number of messages to create in the segment
+ */
+ private void buildSegment(int size, int count) {
+ byte[] bytes = new byte[size / count];
+ for (int i = 0; i < count; i++) {
+ RANDOM.nextBytes(bytes);
+ MemoryRecords records = MemoryRecords.withRecords(
+ RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE,
TimestampType.CREATE_TIME,
+ new SimpleRecord(bytes)
+ );
+ log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
+ MetadataVersion.latestTesting(), RequestLocal.NoCaching(),
VerificationGuard.SENTINEL
+ );
+ }
+ log.roll(Option.empty());
+ }
+
+ /**
+ * Builds a segment of the given size with a default of 100 messages and
append it to the log
+ * then roll the segment after.
+ * Each segment has some overhead of around 70bytes which impact log
retention policy.
+ *
+ * @param size size of the segment to create
+ */
+ private void buildSegment(int size) {
+ buildSegment(size, 100);
+ }
+
+ @Test
+ void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException {
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+
+ task.run();
+
+ verify(remoteStorageManager, times(3)).copyLogSegmentData(any(),
any());
+ }
+
+ @Test
+ void failedSegmentAreRetried() throws RemoteStorageException {
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+
+ when(remoteStorageManager.copyLogSegmentData(any(), any()))
+ .thenThrow(new RemoteStorageException(""))
+ .thenReturn(Optional.empty());
+
+ task.run();
+ task.run();
+
+ verify(remoteStorageManager, times(4)).copyLogSegmentData(any(),
any());
+ }
+
+ @Test
+ void failedSegmentsAreNotAccountedInSizeForRetention() throws
RemoteStorageException {
+ // We're testing the scenario where the first segment is correctly
pushed then the next segment fails to get
+ // uploaded multiple times before succeeding again.
+ // At the end we should see no call to delete as retention.bytes was
never breached
+ RemoteLogManager.RLMTask copyTask = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+ RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new
RLMExpirationTask(topicIdPartition);
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenReturn(Optional.empty());
+ copyTask.run();
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenThrow(new RemoteStorageException(""));
+ for (int i = 0; i < 4; i++) {
+ copyTask.run();
+ }
+
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(),
any());
+ copyTask.run();
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ copyTask.run();
+ cleanupTask.run();
+
+ // total is 1 (success) + 4 (failures) + 2x1 (success)
+ verify(remoteStorageManager, times(7)).copyLogSegmentData(any(),
any());
+ verify(remoteStorageManager, never()).deleteLogSegmentData(
+ argThat(segment ->
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED))
+ );
+ verify(remoteStorageManager, times(4)).deleteLogSegmentData(
+ argThat(segment ->
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED))
+ );
Review Comment:
I'll add this as a comment but the idea is that, because we have a
`COPY_SEGMENT_FINISHED` after a `COPY_SEGMENT_STARTED` in the metadata for this
topic partition, all the `COPY_SEGMENT_STARTED` segments will be seen as
dangling and will be deleted.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {
+ private static final Random RANDOM = new Random(0);
+
+ private KafkaConfig config;
+ private final String clusterId = "dummyId";
+ private final int retentionSize = 10_000_000;
+ private final File logDir = TestUtils.tempDirectory("kafka-");
+ private final File partitionDir =
kafka.utils.TestUtils.randomPartitionLogDir(logDir);
+ private BrokerTopicStats brokerTopicStats;
+ private final MockTime mockTime = new MockTime();
+ private Uuid topicId;
+ private UnifiedLog log;
+ private final Metrics metrics = new Metrics();
+
+ private final RemoteStorageManager remoteStorageManager =
Mockito.mock(RemoteStorageManager.class);
+ private final InMemoryRemoteLogMetadataManager remoteLogMetadataManager =
new InMemoryRemoteLogMetadataManager();
+ private final RLMQuotaManager rlmCopyQuotaManager =
mock(RLMQuotaManager.class);
+ private RemoteLogManager remoteLogManager;
+
+ private final String remoteLogStorageTestProp = "remote.log.storage.test";
+ private final String remoteLogStorageTestVal = "storage.test";
+
+ private TopicIdPartition topicIdPartition;
+
+
+ @BeforeEach
+ void setUp() throws Exception {
+ Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
"true");
+
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
"100");
+ appendRLMConfig(props);
+ config = KafkaConfig.fromProps(props);
+ topicId = Uuid.randomUuid();
+ topicIdPartition = new TopicIdPartition(topicId, new
TopicPartition("test", 0));
+ brokerTopicStats = new
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
+
+ HashMap<String, Object> properties = new HashMap<>();
+ properties.put("retention.bytes", retentionSize);
+ properties.put("local.retention.bytes", "1");
+ properties.put("remote.storage.enable", "true");
+
+ LogConfig logConfig = new LogConfig(properties);
+ log = LogTestUtils.createLog(
+ partitionDir, logConfig, brokerTopicStats, mockTime.scheduler,
mockTime, 0L, 0L, 5 * 60 * 1000,
+ new ProducerStateManagerConfig(86400000, false), 600000, true,
+ Option.apply(topicId), true, new ConcurrentHashMap<>(), true,
null, LogOffsetsListener.NO_OP_OFFSETS_LISTENER
+ );
+
+ remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), 1, logDir.toString(),
clusterId, mockTime,
+ tp -> Optional.of(log),
+ (topicPartition, offset) ->
log.maybeIncrementLogStartOffset(offset,
LogStartOffsetIncrementReason.SegmentDeletion),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+
+ public RLMQuotaManager createRLMCopyQuotaManager() {
+ return rlmCopyQuotaManager;
+ }
+
+ public Duration quotaTimeout() {
+ return Duration.ofMillis(100);
+ }
+
+ @Override
+ long findLogStartOffset(TopicIdPartition topicIdPartition,
UnifiedLog log) {
+ return 0L;
+ }
+ };
+ remoteLogMetadataManager.initialise(topicIdPartition);
+ remoteLogManager.startup();
+ }
+
+ @AfterEach
+ void cleanup() {
+ log.close();
+ }
+
+ private void appendRLMConfig(Properties props) {
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
+
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
NoOpRemoteStorageManager.class.getName());
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
NoOpRemoteLogMetadataManager.class.getName());
+ props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX +
remoteLogStorageTestProp, remoteLogStorageTestVal);
+ }
+
+ /**
+ * Builds a segment of the given size and append it to the log, then roll
the segment after.
+ * Each segment has some overhead of around 70bytes which impact log
retention policy.
+ *
+ * @param size size of the segment to create
+ * @param count number of messages to create in the segment
+ */
+ private void buildSegment(int size, int count) {
+ byte[] bytes = new byte[size / count];
+ for (int i = 0; i < count; i++) {
+ RANDOM.nextBytes(bytes);
+ MemoryRecords records = MemoryRecords.withRecords(
+ RecordBatch.MAGIC_VALUE_V2, 0, Compression.NONE,
TimestampType.CREATE_TIME,
+ new SimpleRecord(bytes)
+ );
+ log.appendAsLeader(records, 0, AppendOrigin.CLIENT,
+ MetadataVersion.latestTesting(), RequestLocal.NoCaching(),
VerificationGuard.SENTINEL
+ );
+ }
+ log.roll(Option.empty());
+ }
+
+ /**
+ * Builds a segment of the given size with a default of 100 messages and
append it to the log
+ * then roll the segment after.
+ * Each segment has some overhead of around 70bytes which impact log
retention policy.
+ *
+ * @param size size of the segment to create
+ */
+ private void buildSegment(int size) {
+ buildSegment(size, 100);
+ }
+
+ @Test
+ void nonActiveSegmentsAreCorrectlyWritten() throws RemoteStorageException {
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+
+ task.run();
+
+ verify(remoteStorageManager, times(3)).copyLogSegmentData(any(),
any());
+ }
+
+ @Test
+ void failedSegmentAreRetried() throws RemoteStorageException {
+ RemoteLogManager.RLMTask task = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ buildSegment(retentionSize);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+
+ when(remoteStorageManager.copyLogSegmentData(any(), any()))
+ .thenThrow(new RemoteStorageException(""))
+ .thenReturn(Optional.empty());
+
+ task.run();
+ task.run();
+
+ verify(remoteStorageManager, times(4)).copyLogSegmentData(any(),
any());
+ }
+
+ @Test
+ void failedSegmentsAreNotAccountedInSizeForRetention() throws
RemoteStorageException {
+ // We're testing the scenario where the first segment is correctly
pushed then the next segment fails to get
+ // uploaded multiple times before succeeding again.
+ // At the end we should see no call to delete as retention.bytes was
never breached
+ RemoteLogManager.RLMTask copyTask = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+ RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new
RLMExpirationTask(topicIdPartition);
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenReturn(Optional.empty());
+ copyTask.run();
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenThrow(new RemoteStorageException(""));
+ for (int i = 0; i < 4; i++) {
+ copyTask.run();
+ }
+
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(),
any());
+ copyTask.run();
+
+ buildSegment(retentionSize / 3);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ copyTask.run();
+ cleanupTask.run();
+
+ // total is 1 (success) + 4 (failures) + 2x1 (success)
+ verify(remoteStorageManager, times(7)).copyLogSegmentData(any(),
any());
+ verify(remoteStorageManager, never()).deleteLogSegmentData(
+ argThat(segment ->
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED))
+ );
+ verify(remoteStorageManager, times(4)).deleteLogSegmentData(
+ argThat(segment ->
segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED))
+ );
+ }
+
+ @Test
+ void failedSegmentsAreNotAccountedInSizeDuringDeletion() throws
RemoteStorageException {
+ RemoteLogManager.RLMTask copyTask = remoteLogManager.new
RLMCopyTask(topicIdPartition, 128);
+ RemoteLogManager.RLMTask cleanupTask = remoteLogManager.new
RLMExpirationTask(topicIdPartition);
+
+ buildSegment(retentionSize - 100);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenThrow(new RemoteStorageException(""));
+ for (int i = 0; i < 4; i++) {
+ copyTask.run();
+ }
+
doReturn(Optional.empty()).when(remoteStorageManager).copyLogSegmentData(any(),
any());
+ copyTask.run();
+
+ buildSegment(retentionSize - 100);
+ log.maybeUpdateHighWatermark(log.logEndOffset());
+ copyTask.run();
+ cleanupTask.run();
+
+ // total is 1 (success) + 4 (failures) + 1 (success)
+ verify(remoteStorageManager, times(6)).copyLogSegmentData(any(),
any());
+
+ // The first segment should now be deleted
+ verify(remoteStorageManager, times(5)).deleteLogSegmentData(
+ argThat(segment -> segment.startOffset() == 0)
+ );
Review Comment:
```
when(remoteStorageManager.copyLogSegmentData(any(),
any())).thenThrow(new RemoteStorageException(""));
for (int i = 0; i < 4; i++) {
copyTask.run();
}
```
This portion will have created 4 failed `COPY_SEGMENT_STARTED` segment and
we have a successful run after. So in total we have 5 segments metadata
referring to the first segment.
All those segments are meant for deletion due to `retention.bytes` policy as
we pushed a second segment after it, which is what is causing the 4 failures +
1 success to be deleted.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerInteractionTest.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.log.LogTestUtils;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.quota.InMemoryRemoteLogMetadataManager;
+import kafka.log.remote.quota.RLMQuotaManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.KafkaConfig;
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.AppendOrigin;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.kafka.storage.internals.log.LogOffsetsListener;
+import org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason;
+import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import scala.Option;
+
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogManagerInteractionTest {
Review Comment:
The idea behind this class is avoid mocking all calls to the unifiedLog and
the RLMM as those mocks can easily become quite long (see
[RemoteLogManagerTest](https://github.com/apache/kafka/blob/0eaaff88cf68bc2c24d4874ff9bc1cc2b493c24b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java#L3035)
) and because I was mostly testing subsequent calls between copy and cleanup I
think this is making the test more clear when reading it rather than having
more mocks between each calls.
##########
core/src/test/java/kafka/log/remote/quota/InMemoryRemoteLogMetadataManager.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
Review Comment:
Good point, I am going to move it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]