exceptionfactory commented on code in PR #10874:
URL: https://github.com/apache/nifi/pull/10874#discussion_r2933627645
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that
after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(contentClaim), "Truncatable claim should
have been routed to the truncation queue");
+ }
+
+ @Test
+ public void testDestructableClaimTakesPriorityOverTruncatable() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+ final ResourceClaim rc =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will
reach 0 after delete
+ final StandardContentClaim contentClaim = createClaim(rc, 1024L,
5_000_000L, true);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+ createAndDeleteFlowFile(repo, ctx.queue(), contentClaim);
+ repo.checkpoint();
+ }
+
+ final List<ResourceClaim> destructed = new ArrayList<>();
+ ctx.claimManager().drainDestructableClaims(destructed, 100);
+ assertTrue(destructed.contains(rc), "Resource claim should be
destructable");
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertFalse(truncated.contains(contentClaim), "Truncatable claim
should NOT be in truncation queue when resource claim is destructable");
+ }
+
+ @Test
+ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws
IOException {
+ final RuntimeRepoContext ctx = createRuntimeRepoContext();
+
+ final ResourceClaim rc1 =
ctx.claimManager().newResourceClaim("container", "section", "1", false, false);
+ ctx.claimManager().incrementClaimantCount(rc1);
+ ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it
stays > 0 after decrement
+ final StandardContentClaim originalClaim = createClaim(rc1, 2048L,
5_000_000L, true);
+
+ final ResourceClaim rc2 =
ctx.claimManager().newResourceClaim("container", "section", "2", false, false);
+ ctx.claimManager().incrementClaimantCount(rc2);
+ final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L,
false);
+
+ final FlowFileRecord originalFlowFile = new
StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(originalClaim)
+ .build();
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(ctx.claimManager());
+ repo.loadFlowFiles(ctx.queueProvider());
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(ctx.queue());
+ createRecord.setWorking(originalFlowFile, false);
+ createRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(createRecord));
+
+ final FlowFileRecord updatedFlowFile = new
StandardFlowFileRecord.Builder()
+ .fromFlowFile(originalFlowFile)
+ .contentClaim(newClaim)
+ .build();
+ final StandardRepositoryRecord updateRecord = new
StandardRepositoryRecord(ctx.queue(), originalFlowFile);
+ updateRecord.setWorking(updatedFlowFile, true);
+ updateRecord.setDestination(ctx.queue());
+ repo.updateRepository(List.of(updateRecord));
+ repo.checkpoint();
+ }
+
+ final List<ContentClaim> truncated = new ArrayList<>();
+ ctx.claimManager().drainTruncatableClaims(truncated, 100);
+ assertTrue(truncated.contains(originalClaim), "Original claim should
have been queued for truncation after content change");
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Recovery Tests
+ //
=========================================================================
+
+ @Test
+ public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws
IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L,
false);
+ final StandardContentClaim largeClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ final List<FlowFileRecord> recovered = writeAndRecover(smallClaim,
largeClaim);
+
+ final FlowFileRecord recoveredLargeFF =
findRecoveredByOffset(recovered, 100L);
+ assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with
the large claim");
+ assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(),
+ "Large tail claim should be marked as truncation candidate
after recovery");
+
+ final FlowFileRecord recoveredSmallFF =
findRecoveredByOffset(recovered, 0L);
+ assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with
the small claim");
+ assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(),
+ "Small claim at offset 0 should NOT be a truncation candidate
after recovery");
+ }
+
+ @Test
+ public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim sharedClaim = createClaim(rc, 100L,
2_000_000L, false);
+
+ // Two FlowFiles sharing the same claim (clone scenario)
+ final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim,
sharedClaim);
+
+ for (final FlowFileRecord ff : recovered) {
+ if (ff.getContentClaim() != null) {
+ assertFalse(ff.getContentClaim().isTruncationCandidate(),
+ "Cloned/shared claim should NOT be a truncation
candidate");
+ }
+ }
+ }
+
+ @Test
+ public void testRecoveryOnlyMarksTailClaim() throws IOException {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final ResourceClaim rc = claimManager.newResourceClaim("container",
"section", "1", false, false);
+ final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L,
false);
Review Comment:
That's fair, sounds good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]