exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933627645


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that 
after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(contentClaim), "Truncatable claim should 
have been routed to the truncation queue");
+    }
+
+    @Test
+    public void testDestructableClaimTakesPriorityOverTruncatable() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+        final ResourceClaim rc = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will 
reach 0 after delete
+        final StandardContentClaim contentClaim = createClaim(rc, 1024L, 
5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+            createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ResourceClaim> destructed = new ArrayList<>();
+        ctx.claimManager().drainDestructableClaims(destructed, 100);
+        assertTrue(destructed.contains(rc), "Resource claim should be 
destructable");
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertFalse(truncated.contains(contentClaim), "Truncatable claim 
should NOT be in truncation queue when resource claim is destructable");
+    }
+
+    @Test
+    public void testUpdateRecordOriginalClaimQueuedForTruncation() throws 
IOException {
+        final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+        final ResourceClaim rc1 = 
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+        ctx.claimManager().incrementClaimantCount(rc1);
+        ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it 
stays > 0 after decrement
+        final StandardContentClaim originalClaim = createClaim(rc1, 2048L, 
5_000_000L, true);
+
+        final ResourceClaim rc2 = 
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+        ctx.claimManager().incrementClaimantCount(rc2);
+        final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L, 
false);
+
+        final FlowFileRecord originalFlowFile = new 
StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(originalClaim)
+                .build();
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(ctx.claimManager());
+            repo.loadFlowFiles(ctx.queueProvider());
+
+            final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(ctx.queue());
+            createRecord.setWorking(originalFlowFile, false);
+            createRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(createRecord));
+
+            final FlowFileRecord updatedFlowFile = new 
StandardFlowFileRecord.Builder()
+                    .fromFlowFile(originalFlowFile)
+                    .contentClaim(newClaim)
+                    .build();
+            final StandardRepositoryRecord updateRecord = new 
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+            updateRecord.setWorking(updatedFlowFile, true);
+            updateRecord.setDestination(ctx.queue());
+            repo.updateRepository(List.of(updateRecord));
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        ctx.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(originalClaim), "Original claim should 
have been queued for truncation after content change");
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Recovery Tests
+    // 
=========================================================================
+
+    @Test
+    public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L, 
false);
+        final StandardContentClaim largeClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(smallClaim, 
largeClaim);
+
+        final FlowFileRecord recoveredLargeFF = 
findRecoveredByOffset(recovered, 100L);
+        assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with 
the large claim");
+        assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+                "Large tail claim should be marked as truncation candidate 
after recovery");
+
+        final FlowFileRecord recoveredSmallFF = 
findRecoveredByOffset(recovered, 0L);
+        assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with 
the small claim");
+        assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+                "Small claim at offset 0 should NOT be a truncation candidate 
after recovery");
+    }
+
+    @Test
+    public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim sharedClaim = createClaim(rc, 100L, 
2_000_000L, false);
+
+        // Two FlowFiles sharing the same claim (clone scenario)
+        final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim, 
sharedClaim);
+
+        for (final FlowFileRecord ff : recovered) {
+            if (ff.getContentClaim() != null) {
+                assertFalse(ff.getContentClaim().isTruncationCandidate(),
+                        "Cloned/shared claim should NOT be a truncation 
candidate");
+            }
+        }
+    }
+
+    @Test
+    public void testRecoveryOnlyMarksTailClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim rc = claimManager.newResourceClaim("container", 
"section", "1", false, false);
+        final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L, 
false);

Review Comment:
   That's fair, sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to