exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2931926951
##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##########
@@ -161,6 +162,30 @@ public void markDestructable(final ResourceClaim claim) {
}
}
+ @Override
+ public void markTruncatable(final ContentClaim contentClaim) {
+ if (contentClaim == null) {
+ return;
+ }
+
+ final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+ synchronized (resourceClaim) {
+ if (isDestructable(resourceClaim)) {
+ return;
+ }
+
+ logger.debug("Marking {} as truncatable", contentClaim);
+ try {
+ if (!truncatableClaims.offer(contentClaim, 1,
TimeUnit.MINUTES)) {
+ logger.debug("Unable to mark {} as truncatable because the
queue is full.", contentClaim);
Review Comment:
It seems like this would be better as an `INFO` level, and it would be
useful to include the queue size.
```suggestion
logger.info("Unable to mark {} as truncatable because
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
```
##########
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+ static final PropertyDescriptor BATCH_COUNT = new
PropertyDescriptor.Builder()
+ .name("Batch Count")
+ .description("The maximum number of batches to generate. Each batch
produces 10 FlowFiles (9 small + 1 large). "
+ + "Once this many batches have been generated, no more
FlowFiles will be produced until the processor is stopped and restarted.")
Review Comment:
Recommend using a multi-line string
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that
after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(contentClaim), "Truncatable claim should
have been routed to the truncation queue");
+ }
+
+ @Test
+ public void testDestructableClaimTakesPriorityOverTruncatable() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will
reach 0 after delete
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ResourceClaim> destructed = new ArrayList<>();
+ ctx.claimManager().drainDestructableClaims(destructed, 100);
+ assertTrue(destructed.contains(rc), "Resource claim should be
destructable");
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertFalse(truncated.contains(contentClaim), "Truncatable claim
should NOT be in truncation queue when resource claim is destructable");
+ }
+
+ @Test
+ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+ final ResourceClaim rc1 =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc1);
+ ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it
stays > 0 after decrement
+ final StandardContentClaim originalClaim = createClaim(rc1, 2048L,
5_000_000L, true);
+
+ final ResourceClaim rc2 =
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+ ctx.claimManager().incrementClaimantCount(rc2);
+ final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L,
false);
+
+ final FlowFileRecord originalFlowFile = new
StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(originalClaim)
+ .build();
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(ctx.queue());
+ createRecord.setWorking(originalFlowFile, false);
+ createRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(createRecord));
+
+ final FlowFileRecord updatedFlowFile = new
StandardFlowFileRecord.Builder()
+ .fromFlowFile(originalFlowFile)
+ .contentClaim(newClaim)
+ .build();
+ final StandardRepositoryRecord updateRecord = new
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+ updateRecord.setWorking(updatedFlowFile, true);
+ updateRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(updateRecord));
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(originalClaim), "Original claim should
have been queued for truncation after content change");
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Recovery Tests
+ //
=========================================================================
+
+ @Test
+ public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L,
false);
+ final StandardContentClaim largeClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(smallClaim,
largeClaim);
+
+ final FlowFileRecord recoveredLargeFF =
findRecoveredByOffset(recovered, 100L);
+ assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with
the large claim");
+ assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+ "Large tail claim should be marked as truncation candidate
after recovery");
+
+ final FlowFileRecord recoveredSmallFF =
findRecoveredByOffset(recovered, 0L);
+ assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with
the small claim");
+ assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+ "Small claim at offset 0 should NOT be a truncation candidate
after recovery");
+ }
+
+ @Test
+ public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim sharedClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ // Two FlowFiles sharing the same claim (clone scenario)
+ final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim,
sharedClaim);
+
+ for (final FlowFileRecord ff : recovered) {
+ if (ff.getContentClaim() != null) {
+ assertFalse(ff.getContentClaim().isTruncationCandidate(),
+ "Cloned/shared claim should NOT be a truncation
candidate");
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveryOnlyMarksTailClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L,
false);
+ final StandardContentClaim claim2 = createClaim(rc, 2_000_100L,
3_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(claim1, claim2);
+
+ final FlowFileRecord tailFF = findRecoveredByOffset(recovered,
2_000_100L);
+ assertNotNull(tailFF, "Should have recovered the tail claim FlowFile");
+ assertTrue(tailFF.getContentClaim().isTruncationCandidate(),
+ "Only the tail claim should be a truncation candidate");
+
+ final FlowFileRecord nonTailFF = findRecoveredByOffset(recovered,
100L);
+ assertNotNull(nonTailFF, "Should have recovered the non-tail claim
FlowFile");
+ assertFalse(nonTailFF.getContentClaim().isTruncationCandidate(),
+ "Non-tail large claim should NOT be a truncation candidate");
+ }
+
+ @Test
+ public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim smallClaim1 = createClaim(rc, 0L, 100L,
false);
+ final StandardContentClaim largeClaim = createClaim(rc, 100L,
2_000_000L, false);
+ final StandardContentClaim smallClaim2 = createClaim(rc, 2_000_100L,
50L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(smallClaim1,
largeClaim, smallClaim2);
+
+ for (final FlowFileRecord ff : recovered) {
+ if (ff.getContentClaim() != null) {
Review Comment:
Is it possible for all of the recovered records to have `null` content
Claims? It looks like it would be better to get a filtered list of non-null
recovered records, assert that the list is not empty, and then check the
truncation candidate status.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that
after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(contentClaim), "Truncatable claim should
have been routed to the truncation queue");
+ }
+
+ @Test
+ public void testDestructableClaimTakesPriorityOverTruncatable() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will
reach 0 after delete
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ResourceClaim> destructed = new ArrayList<>();
+ ctx.claimManager().drainDestructableClaims(destructed, 100);
+ assertTrue(destructed.contains(rc), "Resource claim should be
destructable");
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertFalse(truncated.contains(contentClaim), "Truncatable claim
should NOT be in truncation queue when resource claim is destructable");
+ }
+
+ @Test
+ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+ final ResourceClaim rc1 =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc1);
+ ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it
stays > 0 after decrement
+ final StandardContentClaim originalClaim = createClaim(rc1, 2048L,
5_000_000L, true);
+
+ final ResourceClaim rc2 =
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+ ctx.claimManager().incrementClaimantCount(rc2);
+ final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L,
false);
+
+ final FlowFileRecord originalFlowFile = new
StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(originalClaim)
+ .build();
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(ctx.queue());
+ createRecord.setWorking(originalFlowFile, false);
+ createRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(createRecord));
+
+ final FlowFileRecord updatedFlowFile = new
StandardFlowFileRecord.Builder()
+ .fromFlowFile(originalFlowFile)
+ .contentClaim(newClaim)
+ .build();
+ final StandardRepositoryRecord updateRecord = new
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+ updateRecord.setWorking(updatedFlowFile, true);
+ updateRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(updateRecord));
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(originalClaim), "Original claim should
have been queued for truncation after content change");
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Recovery Tests
+ //
=========================================================================
+
+ @Test
+ public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L,
false);
+ final StandardContentClaim largeClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(smallClaim,
largeClaim);
+
+ final FlowFileRecord recoveredLargeFF =
findRecoveredByOffset(recovered, 100L);
+ assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with
the large claim");
+ assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+ "Large tail claim should be marked as truncation candidate
after recovery");
+
+ final FlowFileRecord recoveredSmallFF =
findRecoveredByOffset(recovered, 0L);
+ assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with
the small claim");
+ assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+ "Small claim at offset 0 should NOT be a truncation candidate
after recovery");
+ }
+
+ @Test
+ public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim sharedClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ // Two FlowFiles sharing the same claim (clone scenario)
+ final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim,
sharedClaim);
+
+ for (final FlowFileRecord ff : recovered) {
+ if (ff.getContentClaim() != null) {
+ assertFalse(ff.getContentClaim().isTruncationCandidate(),
+ "Cloned/shared claim should NOT be a truncation
candidate");
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveryOnlyMarksTailClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L,
false);
Review Comment:
The sizes in this method and several others appear to be reused multiple
times, so it looks like it would be helpful to use some static variables for a
few of the repeated values.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -786,6 +849,16 @@ public long loadFlowFiles(final QueueProvider
queueProvider) throws IOException
// If recoveredRecords has been populated it need to be nulled out now
because it is no longer useful and can be garbage collected.
recoveredRecords = null;
+ // If any Content Claim was determined to be truncatable, mark it as
such now.
+ for (final StandardContentClaim eligible : truncationEligibleClaims) {
+ final ContentClaim latestForResource =
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+ if (!Objects.equals(eligible, latestForResource)) {
+ continue;
+ }
+
+ eligible.setTruncationCandidate(true);
Review Comment:
Instead of checking not-equals and continuing, it seems like this could be
changed to check equals, and then conditionally set `true`
##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##########
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements
ResourceClaimManager {
private static final Logger logger =
LoggerFactory.getLogger(StandardResourceClaimManager.class);
private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts =
new ConcurrentHashMap<>();
private final BlockingQueue<ResourceClaim> destructableClaims = new
LinkedBlockingQueue<>(50000);
+ private final BlockingQueue<ContentClaim> truncatableClaims = new
LinkedBlockingQueue<>(100000);
Review Comment:
Is there any particular reason for selecting 100,000 as the queue size? Is
it related to the destructableClaims size, or just a reasonably high limit? If
there is any particular reason, it would be helpful to add a comment for future
reference.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ while (true) {
Review Comment:
The `while (true) {` construction always gives me pause, although the
`return` clarifies expected behavior. Since this Runnable is executed on a
schedule, is it necessary to have this loop, as opposed to just waiting for the
next invocation from the scheduler?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1903,4 +2064,41 @@ public synchronized ContentClaim newContentClaim()
throws IOException {
}
}
+ private static class TruncationClaimManager {
+ private static final int MAX_THRESHOLD = 100_000;
+ private final Map<String, List<ContentClaim>> truncationClaims = new
HashMap<>();
+
+ public synchronized void addTruncationClaims(final String container,
final List<ContentClaim> claim) {
Review Comment:
Do these class methods need to be `public`?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ final long maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
Review Comment:
Is there a reason for parsing this property as opposed to just declaring the
static max claim length in this test class?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
Review Comment:
I generally avoid `.` at the end of log messages, but it seems to be used on
many of these logs, recommend removing.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ final long maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+
+ // Create a small claim C1 at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim c1 = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(c1)) {
+ out.write(smallData);
+ }
+ // C1 should NOT be a truncation candidate (it's small)
Review Comment:
This comment is duplicative of the message with the assertion.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation.", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run.", container);
+ activationCache.put(container, false);
+ return false;
+ }
+
+ final long usableSpace;
+ try {
+ usableSpace = getContainerUsableSpace(container);
+ } catch (final IOException ioe) {
+ LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container.", container, ioe);
+ return false;
+ }
+
+ final Long minUsableSpace =
minUsableContainerBytesForArchive.get(container);
+ if (minUsableSpace != null && usableSpace < minUsableSpace) {
+ LOG.debug("Truncate is active for Container {} because usable
space of {} bytes is below the desired threshold of {} bytes.",
+ container, usableSpace, minUsableSpace);
+
+ activationCache.put(container, true);
+ return true;
+ }
+
+ activationCache.put(container, false);
+ return false;
+ }
+
+ private void truncate(final ContentClaim claim) {
+ LOG.info("Truncating {} to {} bytes because the FlowFile occupying
the last {} bytes has been removed",
+ claim.getResourceClaim(), claim.getOffset(),
claim.getLength());
+
+ final Path path = getPath(claim);
+ if (path == null) {
+ LOG.warn("Cannot truncate {} because the file cannot be
found", claim);
+ return;
+ }
+
+ try (final FileChannel fileChannel = FileChannel.open(path,
StandardOpenOption.WRITE)) {
+ fileChannel.truncate(claim.getOffset());
+ } catch (final NoSuchFileException nsfe) {
+ // This is unlikely but can occur if the claim was truncatable
and the underlying Resource Claim becomes
+ // destructable. In this case, we may archive or delete the
entire ResourceClaim. This is safe to ignore,
+ // since it means the data is cleaned up anyway.
+ LOG.debug("Failed to truncate {} because file does not
exist.", claim, nsfe);
Review Comment:
```suggestion
LOG.debug("Failed to truncate {} because file [{}] does not
exist", claim, path, nsfe);
```
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties
nifiProperties) {
retainOrphanedFlowFiles = orphanedFlowFileProperty == null ||
Boolean.parseBoolean(orphanedFlowFileProperty);
this.maxCharactersToCache =
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+ final long maxAppendableClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+ truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
Review Comment:
It would be helpful to comment on the reason for the minimum value selected.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -99,9 +101,10 @@ public class FileSystemRepository implements
ContentRepository {
private final List<String> containerNames;
private final AtomicLong index;
- private final ScheduledExecutorService executor = new FlowEngine(4,
"FileSystemRepository Workers", true);
+ private final ScheduledExecutorService executor = new FlowEngine(6,
"FileSystemRepository Workers", true);
Review Comment:
Any particular reason for increasing from 4 to 6 that is worth commenting?
Due to the addition of the `TruncateClaims` Runnable?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider
queueProvider) throws IOException
continue;
} else if (claim != null) {
+ // If the claim exceeds the max appendable claim length on its
own and doesn't start the Resource Claim,
+ // we will consider it to be eligible for truncation. However,
if there are multiple FlowFiles sharing the
+ // same claim, we cannot truncate it because doing so would
affect the other FlowFiles.
+ if (claim.getOffset() > 0 && claim.getLength() >
truncationThreshold && claim instanceof final StandardContentClaim scc) {
Review Comment:
It seems like the claim instance type should be checked first, or is this
done after for a specific reason? On the other hand, could this ever be
something other than the `StandardContentClaim`?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ final long maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+
+ // Create a small claim C1 at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim c1 = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(c1)) {
+ out.write(smallData);
+ }
+ // C1 should NOT be a truncation candidate (it's small)
+ assertFalse(c1.isTruncationCandidate(), "Small claim at offset 0
should not be a truncation candidate");
+
+ // Now create C2 on potentially the same ResourceClaim, writing more
than maxAppendableClaimLength to freeze
+ // the ResourceClaim. Because c1 was small and recycled, c2 will be at
a non-zero offset on the same ResourceClaim.
+ final ContentClaim c2 = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(c2)) {
+ out.write(largeData);
+ }
+ // C2 should be a truncation candidate: large and at a non-zero offset
+ assertTrue(c2.isTruncationCandidate(), "Large claim at non-zero offset
should be a truncation candidate");
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim scc &&
scc.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim, "Should have found a claim at offset
0");
+ assertFalse(offsetZeroClaim.isTruncationCandidate(), "Large claim at
offset 0 should NOT be a truncation candidate");
+ }
+
+ @Test
+ public void testIncrementClaimantCountClearsTruncationCandidate() throws
IOException {
+ final long maxClaimLength =
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(),
DataUnit.B).longValue();
+
+ // Create a small claim to start a ResourceClaim, then a large claim
to freeze it
+ final ContentClaim c1 = repository.create(false);
+ try (final OutputStream out = repository.write(c1)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim c2 = repository.create(false);
+ try (final OutputStream out = repository.write(c2)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ // c2 should be a truncation candidate
+ assertTrue(c2.isTruncationCandidate(), "Claim should be a truncation
candidate before incrementClaimaintCount");
+
+ // Simulate a clone by incrementing claimant count
+ repository.incrementClaimaintCount(c2);
+
+ // After incrementing, it should no longer be a truncation candidate
+ assertFalse(c2.isTruncationCandidate(), "Claim should NOT be a
truncation candidate after incrementClaimaintCount");
Review Comment:
A number of the assert methods have comments that mirror the message for the
assertion, recommend removing the comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]