This is an automated email from the ASF dual-hosted git repository.
jmclean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f543c0e15b [#10623] fix(core): reverse cleanup ordering in
JobManager.cleanUpStagingDirs (#10627)
f543c0e15b is described below
commit f543c0e15bc5f7721d3455b4cdd0e61071cf96f0
Author: Roshan Banisetti <[email protected]>
AuthorDate: Wed Apr 8 00:20:09 2026 -0600
[#10623] fix(core): reverse cleanup ordering in
JobManager.cleanUpStagingDirs (#10627)
### What changes were proposed in this pull request?
Reversed the cleanup order in `JobManager.cleanUpStagingDirs()` so the
staging directory is deleted before the job entity record. Added a unit
test to verify that when directory deletion fails, the entity record is
preserved.
### Why are the changes needed?
The previous order deleted the job entity first, then the staging
directory. If `FileUtils.deleteDirectory()` failed with an
`IOException`, the error was only logged. Since subsequent cleanup runs
discover candidates via `entityStore.list()`, the orphaned staging
directory became invisible and was left behind indefinitely.
Fix: #10623
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added `testCleanUpStagingDirsDeletesDirectoryBeforeEntity` which mocks
`FileUtils.deleteDirectory` to throw `IOException` and verifies that
`entityStore.delete` is never called. Existing `testCleanUpStagingDirs`
also continues to pass.
Signed-off-by: Roshan1299 <[email protected]>
---
.../java/org/apache/gravitino/job/JobManager.java | 8 ++--
.../org/apache/gravitino/job/TestJobManager.java | 48 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 95b90e89dc..7d79ffa416 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -648,17 +648,17 @@ public class JobManager implements JobOperationDispatcher
{
finishedJobs.forEach(
job -> {
try {
- entityStore.delete(
- NameIdentifierUtil.ofJob(metalake, job.name()),
Entity.EntityType.JOB);
-
String jobStagingPath =
stagingDir.getAbsolutePath()
+ String.format(JOB_STAGING_DIR, metalake,
job.jobTemplateName(), job.id());
File jobStagingDir = new File(jobStagingPath);
if (jobStagingDir.exists()) {
FileUtils.deleteDirectory(jobStagingDir);
- LOG.info("Deleted job staging directory {} for job {}",
jobStagingPath, job.name());
}
+
+ entityStore.delete(
+ NameIdentifierUtil.ofJob(metalake, job.name()),
Entity.EntityType.JOB);
+ LOG.info("Deleted job staging directory {} for job {}",
jobStagingPath, job.name());
} catch (IOException e) {
LOG.error("Failed to delete job and staging directory for job
{}", job.name(), e);
}
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 2dc4af741f..172925ab0e 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -633,6 +633,54 @@ public class TestJobManager {
});
}
+ @Test
+ public void testCleanUpStagingDirsDeletesDirectoryBeforeEntity() throws
IOException {
+ JobEntity finishedJob = newJobEntity("shell_job",
JobHandle.Status.SUCCEEDED);
+ BaseMetalake mockMetalakeEntity =
+ BaseMetalake.builder()
+ .withName(metalake)
+ .withId(idGenerator.nextId())
+ .withVersion(SchemaVersion.V_0_1)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
+ when(entityStore.list(Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE))
+ .thenReturn(ImmutableList.of(mockMetalakeEntity));
+ mockedMetalake
+ .when(() -> MetalakeManager.listInUseMetalakes(entityStore))
+ .thenReturn(ImmutableList.of(metalake));
+ when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(finishedJob));
+
+ // Create the staging directory so FileUtils.deleteDirectory is actually
invoked
+ String jobStagingPath =
+ testStagingDir
+ + File.separator
+ + metalake
+ + File.separator
+ + finishedJob.jobTemplateName()
+ + File.separator
+ + JobHandle.JOB_ID_PREFIX
+ + finishedJob.id();
+ File jobStagingDir = new File(jobStagingPath);
+ Assertions.assertTrue(jobStagingDir.mkdirs());
+
+ // Simulate IOException when deleting the staging directory
+ try (MockedStatic<org.apache.commons.io.FileUtils> mockedFileUtils =
+ mockStatic(org.apache.commons.io.FileUtils.class)) {
+ mockedFileUtils
+ .when(() ->
org.apache.commons.io.FileUtils.deleteDirectory(any(File.class)))
+ .thenThrow(new IOException("Simulated IO failure"));
+
+ Awaitility.await()
+ .atMost(3, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ jobManager.cleanUpStagingDirs();
+ // Entity should NOT be deleted because directory deletion
failed
+ verify(entityStore, never()).delete(any(), any());
+ });
+ }
+ }
+
@Test
public void testUpdateShellJobTemplateEntity() {
String jobTemplateName = "old_shell_job";