exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2931926951


##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##########
@@ -161,6 +162,30 @@ public void markDestructable(final ResourceClaim claim) {
         }
     }
 
+    @Override
+    public void markTruncatable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        synchronized (resourceClaim) {
+            if (isDestructable(resourceClaim)) {
+                return;
+            }
+
+            logger.debug("Marking {} as truncatable", contentClaim);
+            try {
+                if (!truncatableClaims.offer(contentClaim, 1, 
TimeUnit.MINUTES)) {
+                    logger.debug("Unable to mark {} as truncatable because the 
queue is full.", contentClaim);

Review Comment:
   It seems like this would be better as an `INFO` level, and it would be 
useful to include the queue size.
   ```suggestion
                       logger.info("Unable to mark {} as truncatable because 
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
   ```



##########
nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+    static final PropertyDescriptor BATCH_COUNT = new 
PropertyDescriptor.Builder()
+        .name("Batch Count")
+        .description("The maximum number of batches to generate. Each batch 
produces 10 FlowFiles (9 small + 1 large). "
+                     + "Once this many batches have been generated, no more 
FlowFiles will be produced until the processor is stopped and restarted.")

Review Comment:
   Recommend using a multi-line string



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that 
after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(contentClaim), "Truncatable claim should 
have been routed to the truncation queue");
+    }
+
+    @Test
+    public void testDestructableClaimTakesPriorityOverTruncatable() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will 
reach 0 after delete
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ResourceClaim> destructed = new ArrayList<>();
+        ctx.claimManager().drainDestructableClaims(destructed, 100);
+        assertTrue(destructed.contains(rc), "Resource claim should be 
destructable");
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertFalse(truncated.contains(contentClaim), "Truncatable claim 
should NOT be in truncation queue when resource claim is destructable");
+    }
+
+    @Test
+    public void testUpdateRecordOriginalClaimQueuedForTruncation() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+        final ResourceClaim rc1 = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc1);
+        ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it 
stays > 0 after decrement
+        final StandardContentClaim originalClaim = createClaim(rc1, 2048L, 
5_000_000L, true);
+
+        final ResourceClaim rc2 = 
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+        ctx.claimManager().incrementClaimantCount(rc2);
+        final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L, 
false);
+
+        final FlowFileRecord originalFlowFile = new 
StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(originalClaim)
+                .build();
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+
+            final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(ctx.queue());
+            createRecord.setWorking(originalFlowFile, false);
+            createRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(createRecord));
+
+            final FlowFileRecord updatedFlowFile = new 
StandardFlowFileRecord.Builder()
+                    .fromFlowFile(originalFlowFile)
+                    .contentClaim(newClaim)
+                    .build();
+            final StandardRepositoryRecord updateRecord = new 
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+            updateRecord.setWorking(updatedFlowFile, true);
+            updateRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(updateRecord));
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(originalClaim), "Original claim should 
have been queued for truncation after content change");
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Recovery Tests
+    // 
=========================================================================
+
+    @Test
+    public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L, 
false);
+        final StandardContentClaim largeClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(smallClaim, 
largeClaim);
+
+        final FlowFileRecord recoveredLargeFF = 
findRecoveredByOffset(recovered, 100L);
+        assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with 
the large claim");
+        assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+                "Large tail claim should be marked as truncation candidate 
after recovery");
+
+        final FlowFileRecord recoveredSmallFF = 
findRecoveredByOffset(recovered, 0L);
+        assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with 
the small claim");
+        assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+                "Small claim at offset 0 should NOT be a truncation candidate 
after recovery");
+    }
+
+    @Test
+    public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim sharedClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        // Two FlowFiles sharing the same claim (clone scenario)
+        final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim, 
sharedClaim);
+
+        for (final FlowFileRecord ff : recovered) {
+            if (ff.getContentClaim() != null) {
+                assertFalse(ff.getContentClaim().isTruncationCandidate(),
+                        "Cloned/shared claim should NOT be a truncation 
candidate");
+            }
+        }
+    }
+
+    @Test
+    public void testRecoveryOnlyMarksTailClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L, 
false);
+        final StandardContentClaim claim2 = createClaim(rc, 2_000_100L, 
3_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(claim1, claim2);
+
+        final FlowFileRecord tailFF = findRecoveredByOffset(recovered, 
2_000_100L);
+        assertNotNull(tailFF, "Should have recovered the tail claim FlowFile");
+        assertTrue(tailFF.getContentClaim().isTruncationCandidate(),
+                "Only the tail claim should be a truncation candidate");
+
+        final FlowFileRecord nonTailFF = findRecoveredByOffset(recovered, 
100L);
+        assertNotNull(nonTailFF, "Should have recovered the non-tail claim 
FlowFile");
+        assertFalse(nonTailFF.getContentClaim().isTruncationCandidate(),
+                "Non-tail large claim should NOT be a truncation candidate");
+    }
+
+    @Test
+    public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim smallClaim1 = createClaim(rc, 0L, 100L, 
false);
+        final StandardContentClaim largeClaim = createClaim(rc, 100L, 
2_000_000L, false);
+        final StandardContentClaim smallClaim2 = createClaim(rc, 2_000_100L, 
50L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(smallClaim1, 
largeClaim, smallClaim2);
+
+        for (final FlowFileRecord ff : recovered) {
+            if (ff.getContentClaim() != null) {

Review Comment:
   Is it possible for all of the recovered records to have `null` content 
Claims? It looks like it would be better to get a filtered list of non-null 
recovered records, assert that the list is not empty, and then check the 
truncation candidate status.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that 
after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(contentClaim), "Truncatable claim should 
have been routed to the truncation queue");
+    }
+
+    @Test
+    public void testDestructableClaimTakesPriorityOverTruncatable() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will 
reach 0 after delete
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ResourceClaim> destructed = new ArrayList<>();
+        ctx.claimManager().drainDestructableClaims(destructed, 100);
+        assertTrue(destructed.contains(rc), "Resource claim should be 
destructable");
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertFalse(truncated.contains(contentClaim), "Truncatable claim 
should NOT be in truncation queue when resource claim is destructable");
+    }
+
+    @Test
+    public void testUpdateRecordOriginalClaimQueuedForTruncation() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+        final ResourceClaim rc1 = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc1);
+        ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it 
stays > 0 after decrement
+        final StandardContentClaim originalClaim = createClaim(rc1, 2048L, 
5_000_000L, true);
+
+        final ResourceClaim rc2 = 
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+        ctx.claimManager().incrementClaimantCount(rc2);
+        final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L, 
false);
+
+        final FlowFileRecord originalFlowFile = new 
StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(originalClaim)
+                .build();
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+
+            final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(ctx.queue());
+            createRecord.setWorking(originalFlowFile, false);
+            createRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(createRecord));
+
+            final FlowFileRecord updatedFlowFile = new 
StandardFlowFileRecord.Builder()
+                    .fromFlowFile(originalFlowFile)
+                    .contentClaim(newClaim)
+                    .build();
+            final StandardRepositoryRecord updateRecord = new 
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+            updateRecord.setWorking(updatedFlowFile, true);
+            updateRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(updateRecord));
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(originalClaim), "Original claim should 
have been queued for truncation after content change");
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Recovery Tests
+    // 
=========================================================================
+
+    @Test
+    public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L, 
false);
+        final StandardContentClaim largeClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(smallClaim, 
largeClaim);
+
+        final FlowFileRecord recoveredLargeFF = 
findRecoveredByOffset(recovered, 100L);
+        assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with 
the large claim");
+        assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+                "Large tail claim should be marked as truncation candidate 
after recovery");
+
+        final FlowFileRecord recoveredSmallFF = 
findRecoveredByOffset(recovered, 0L);
+        assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with 
the small claim");
+        assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+                "Small claim at offset 0 should NOT be a truncation candidate 
after recovery");
+    }
+
+    @Test
+    public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim sharedClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        // Two FlowFiles sharing the same claim (clone scenario)
+        final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim, 
sharedClaim);
+
+        for (final FlowFileRecord ff : recovered) {
+            if (ff.getContentClaim() != null) {
+                assertFalse(ff.getContentClaim().isTruncationCandidate(),
+                        "Cloned/shared claim should NOT be a truncation 
candidate");
+            }
+        }
+    }
+
+    @Test
+    public void testRecoveryOnlyMarksTailClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L, 
false);

Review Comment:
   The sizes in this method and several others appear to be reused multiple 
times, so it looks like it would be helpful to use some static variables for a 
few of the repeated values.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -786,6 +849,16 @@ public long loadFlowFiles(final QueueProvider 
queueProvider) throws IOException
         // If recoveredRecords has been populated it need to be nulled out now 
because it is no longer useful and can be garbage collected.
         recoveredRecords = null;
 
+        // If any Content Claim was determined to be truncatable, mark it as 
such now.
+        for (final StandardContentClaim eligible : truncationEligibleClaims) {
+            final ContentClaim latestForResource = 
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+            if (!Objects.equals(eligible, latestForResource)) {
+                continue;
+            }
+
+            eligible.setTruncationCandidate(true);

Review Comment:
   Instead of checking not-equals and continuing, it seems like this could be 
changed to check equals, and then conditionally set `true`



##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java:
##########
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardResourceClaimManager.class);
     private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = 
new ConcurrentHashMap<>();
     private final BlockingQueue<ResourceClaim> destructableClaims = new 
LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ContentClaim> truncatableClaims = new 
LinkedBlockingQueue<>(100000);

Review Comment:
   Is there any particular reason for selecting 100,000 as the queue size? Is 
it related to the destructableClaims size, or just a reasonably high limit? If 
there is any particular reason, it would be helpful to add a comment for future 
reference.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            while (true) {

Review Comment:
   The `while (true) {` construction always gives me pause, although the 
`return` clarifies expected behavior. Since this Runnable is executed on a 
schedule, is it necessary to have this loop, as opposed to just waiting for the 
next invocation from the scheduler?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1903,4 +2064,41 @@ public synchronized ContentClaim newContentClaim() 
throws IOException {
         }
     }
 
+    private static class TruncationClaimManager {
+        private static final int MAX_THRESHOLD = 100_000;
+        private final Map<String, List<ContentClaim>> truncationClaims = new 
HashMap<>();
+
+        public synchronized void addTruncationClaims(final String container, 
final List<ContentClaim> claim) {

Review Comment:
   Do these class methods need to be `public`?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        final long maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();

Review Comment:
   Is there a reason for parsing this property as opposed to just declaring the 
static max claim length in this test class?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            while (true) {
+                final List<ContentClaim> toTruncate = new ArrayList<>();
+                resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+                if (toTruncate.isEmpty()) {
+                    return;
+                }
+
+                truncateClaims(toTruncate, truncationActivationCache);
+            }
+        }
+
+        private void truncateClaims(final List<ContentClaim> toTruncate, final 
Map<String, Boolean> truncationActivationCache) {
+            final Map<String, List<ContentClaim>> claimsSkipped = new 
HashMap<>();
+
+            for (final ContentClaim claim : toTruncate) {
+                final String container = 
claim.getResourceClaim().getContainer();
+                if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation.", claim, container);
+                    claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+                    continue;
+                }
+
+                if (claim.isTruncationCandidate()) {
+                    truncate(claim);
+                }
+            }
+
+            claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+        }
+
+        private boolean isTruncationActiveForContainer(final String container, 
final Map<String, Boolean> activationCache) {
+            // If not archiving data, we consider truncation always active.
+            if (!archiveData) {
+                return true;
+            }
+
+            final Boolean cachedValue = activationCache.get(container);
+            if (cachedValue != null) {
+                return cachedValue;
+            }
+
+            if (!isArchiveClearedOnLastRun(container)) {
+                LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run.", container);

Review Comment:
   I generally avoid `.` at the end of log messages, but it seems to be used on 
many of these logs, recommend removing.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        final long maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+
+        // Create a small claim C1 at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim c1 = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(c1)) {
+            out.write(smallData);
+        }
+        // C1 should NOT be a truncation candidate (it's small)

Review Comment:
   This comment is duplicative of the message with the assertion.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1047,119 @@ public void purge() {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            while (true) {
+                final List<ContentClaim> toTruncate = new ArrayList<>();
+                resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+                if (toTruncate.isEmpty()) {
+                    return;
+                }
+
+                truncateClaims(toTruncate, truncationActivationCache);
+            }
+        }
+
+        private void truncateClaims(final List<ContentClaim> toTruncate, final 
Map<String, Boolean> truncationActivationCache) {
+            final Map<String, List<ContentClaim>> claimsSkipped = new 
HashMap<>();
+
+            for (final ContentClaim claim : toTruncate) {
+                final String container = 
claim.getResourceClaim().getContainer();
+                if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation.", claim, container);
+                    claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+                    continue;
+                }
+
+                if (claim.isTruncationCandidate()) {
+                    truncate(claim);
+                }
+            }
+
+            claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+        }
+
+        private boolean isTruncationActiveForContainer(final String container, 
final Map<String, Boolean> activationCache) {
+            // If not archiving data, we consider truncation always active.
+            if (!archiveData) {
+                return true;
+            }
+
+            final Boolean cachedValue = activationCache.get(container);
+            if (cachedValue != null) {
+                return cachedValue;
+            }
+
+            if (!isArchiveClearedOnLastRun(container)) {
+                LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run.", container);
+                activationCache.put(container, false);
+                return false;
+            }
+
+            final long usableSpace;
+            try {
+                usableSpace = getContainerUsableSpace(container);
+            } catch (final IOException ioe) {
+                LOG.warn("Failed to determine usable space for container {}. 
Will not truncate claims for this container.", container, ioe);
+                return false;
+            }
+
+            final Long minUsableSpace = 
minUsableContainerBytesForArchive.get(container);
+            if (minUsableSpace != null && usableSpace < minUsableSpace) {
+                LOG.debug("Truncate is active for Container {} because usable 
space of {} bytes is below the desired threshold of {} bytes.",
+                    container, usableSpace, minUsableSpace);
+
+                activationCache.put(container, true);
+                return true;
+            }
+
+            activationCache.put(container, false);
+            return false;
+        }
+
+        private void truncate(final ContentClaim claim) {
+            LOG.info("Truncating {} to {} bytes because the FlowFile occupying 
the last {} bytes has been removed",
+                claim.getResourceClaim(), claim.getOffset(), 
claim.getLength());
+
+            final Path path = getPath(claim);
+            if (path == null) {
+                LOG.warn("Cannot truncate {} because the file cannot be 
found", claim);
+                return;
+            }
+
+            try (final FileChannel fileChannel = FileChannel.open(path, 
StandardOpenOption.WRITE)) {
+                fileChannel.truncate(claim.getOffset());
+            } catch (final NoSuchFileException nsfe) {
+                // This is unlikely but can occur if the claim was truncatable 
and the underlying Resource Claim becomes
+                // destructable. In this case, we may archive or delete the 
entire ResourceClaim. This is safe to ignore,
+                // since it means the data is cleaned up anyway.
+                LOG.debug("Failed to truncate {} because file does not 
exist.", claim, nsfe);

Review Comment:
   ```suggestion
                   LOG.debug("Failed to truncate {} because file [{}] does not 
exist", claim, path, nsfe);
   ```



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties 
nifiProperties) {
         retainOrphanedFlowFiles = orphanedFlowFileProperty == null || 
Boolean.parseBoolean(orphanedFlowFileProperty);
 
         this.maxCharactersToCache = 
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+        final long maxAppendableClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+        truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);

Review Comment:
   It would be helpful to comment on the reason for the minimum value selected.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -99,9 +101,10 @@ public class FileSystemRepository implements 
ContentRepository {
     private final List<String> containerNames;
     private final AtomicLong index;
 
-    private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
+    private final ScheduledExecutorService executor = new FlowEngine(6, 
"FileSystemRepository Workers", true);

Review Comment:
   Any particular reason for increasing from 4 to 6 that is worth commenting? 
Due to the addition of the `TruncateClaims` Runnable?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider 
queueProvider) throws IOException
 
                 continue;
             } else if (claim != null) {
+                // If the claim exceeds the max appendable claim length on its 
own and doesn't start the Resource Claim,
+                // we will consider it to be eligible for truncation. However, 
if there are multiple FlowFiles sharing the
+                // same claim, we cannot truncate it because doing so would 
affect the other FlowFiles.
+                if (claim.getOffset() > 0 && claim.getLength() > 
truncationThreshold && claim instanceof final StandardContentClaim scc) {

Review Comment:
   It seems like the claim instance type should be checked first, or is this 
done after for a specific reason? On the other hand, could this ever be 
something other than the `StandardContentClaim`?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,6 +905,272 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        final long maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+
+        // Create a small claim C1 at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim c1 = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(c1)) {
+            out.write(smallData);
+        }
+        // C1 should NOT be a truncation candidate (it's small)
+        assertFalse(c1.isTruncationCandidate(), "Small claim at offset 0 
should not be a truncation candidate");
+
+        // Now create C2 on potentially the same ResourceClaim, writing more 
than maxAppendableClaimLength to freeze
+        // the ResourceClaim. Because c1 was small and recycled, c2 will be at 
a non-zero offset on the same ResourceClaim.
+        final ContentClaim c2 = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(c2)) {
+            out.write(largeData);
+        }
+        // C2 should be a truncation candidate: large and at a non-zero offset
+        assertTrue(c2.isTruncationCandidate(), "Large claim at non-zero offset 
should be a truncation candidate");
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim scc && 
scc.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim, "Should have found a claim at offset 
0");
+        assertFalse(offsetZeroClaim.isTruncationCandidate(), "Large claim at 
offset 0 should NOT be a truncation candidate");
+    }
+
+    @Test
+    public void testIncrementClaimantCountClearsTruncationCandidate() throws 
IOException {
+        final long maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+
+        // Create a small claim to start a ResourceClaim, then a large claim 
to freeze it
+        final ContentClaim c1 = repository.create(false);
+        try (final OutputStream out = repository.write(c1)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim c2 = repository.create(false);
+        try (final OutputStream out = repository.write(c2)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        // c2 should be a truncation candidate
+        assertTrue(c2.isTruncationCandidate(), "Claim should be a truncation 
candidate before incrementClaimaintCount");
+
+        // Simulate a clone by incrementing claimant count
+        repository.incrementClaimaintCount(c2);
+
+        // After incrementing, it should no longer be a truncation candidate
+        assertFalse(c2.isTruncationCandidate(), "Claim should NOT be a 
truncation candidate after incrementClaimaintCount");

Review Comment:
   A number of the assert methods have comments that mirror the message for the 
assertion, recommend removing the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to