yihua commented on code in PR #18337:
URL: https://github.com/apache/hudi/pull/18337#discussion_r3035624129
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -214,12 +235,58 @@ protected Option<HoodieCleanerPlan> requestClean() {
}
final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
Option<HoodieCleanerPlan> option = Option.empty();
- if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
- &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0) {
+ if ((cleanerPlan.getPartitionsToBeDeleted() != null &&
!cleanerPlan.getPartitionsToBeDeleted().isEmpty())
+ || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
+ &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0)) {
// Only create cleaner plan which does some work
option = Option.of(cleanerPlan);
}
+ // If cleaner plan returned an empty list, incremental clean is enabled
and there was no
+ // completed clean created in the last X hours configured in
MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
+ // create a dummy clean to avoid full scan in the future.
+ // Note: For a dataset with incremental clean enabled, that does not
receive any updates, cleaner plan always comes
+ // with an empty list of files to be cleaned. CleanActionExecutor would
never be invoked for this dataset.
+ // To avoid fullscan on the dataset with every ingestion run, empty clean
commit is created here.
+ if (config.incrementalCleanerModeEnabled() &&
cleanerPlan.getEarliestInstantToRetain() != null &&
config.maxDurationToCreateEmptyCleanMs() > 0) {
+ // Only create an empty clean commit if earliestInstantToRetain is
present in the plan
+ boolean eligibleForEmptyCleanCommit = true;
+
+ // if there is no previous clean instant or the previous clean instant
was before the configured max duration, schedule an empty clean commit
+ Option<HoodieInstant> lastCleanInstant =
table.getCleanTimeline().lastInstant();
+ if (lastCleanInstant.isPresent()) {
+ try {
+ ZonedDateTime latestDateTime =
ZonedDateTime.ofInstant(java.time.Instant.now(),
table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
+ long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
+ long lastCleanTimeMs =
HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
+ eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs >
config.maxDurationToCreateEmptyCleanMs();
+ } catch (ParseException e) {
+ log.error("Unable to parse last clean commit time", e);
+ throw new HoodieException("Unable to parse last clean commit time",
e);
+ }
+ }
+ if (eligibleForEmptyCleanCommit) {
+ // Ensure earliestCommitToRetain doesn't go backwards when user
changes cleaner configuration
+ if (lastCleanInstant.isPresent()) {
+ try {
+ HoodieCleanMetadata lastCleanMetadata =
table.getActiveTimeline().readCleanMetadata(lastCleanInstant.get());
+ String previousEarliestCommitToRetain =
lastCleanMetadata.getEarliestCommitToRetain();
+ String currentEarliestCommitToRetain =
cleanerPlan.getEarliestInstantToRetain().getTimestamp();
+ if (compareTimestamps(currentEarliestCommitToRetain, LESSER_THAN,
previousEarliestCommitToRetain)) {
Review Comment:
🤖 Could `lastCleanMetadata.getEarliestCommitToRetain()` be null here? For
tables upgraded from older Hudi versions, or for cleans where the policy didn't
track this field, the metadata may have a null `earliestCommitToRetain`. If so,
`compareTimestamps` will NPE since `LESSER_THAN` delegates to
`String.compareTo()`. It might be worth adding a null guard — e.g. skip the
backwards check if `previousEarliestCommitToRetain` is null.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -214,12 +235,58 @@ protected Option<HoodieCleanerPlan> requestClean() {
}
final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
Option<HoodieCleanerPlan> option = Option.empty();
- if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
- &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0) {
+ if ((cleanerPlan.getPartitionsToBeDeleted() != null &&
!cleanerPlan.getPartitionsToBeDeleted().isEmpty())
+ || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
+ &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0)) {
// Only create cleaner plan which does some work
option = Option.of(cleanerPlan);
}
+ // If cleaner plan returned an empty list, incremental clean is enabled
and there was no
+ // completed clean created in the last X hours configured in
MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
+ // create a dummy clean to avoid full scan in the future.
+ // Note: For a dataset with incremental clean enabled, that does not
receive any updates, cleaner plan always comes
+ // with an empty list of files to be cleaned. CleanActionExecutor would
never be invoked for this dataset.
Review Comment:
🤖 The empty clean eligibility block runs even when `option` is already
non-empty (i.e., there are real files or partitions to clean). It's harmless
but does unnecessary work reading last-clean metadata. Could you gate this with
`if (option.isEmpty() && ...)`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -214,15 +238,52 @@ protected Option<HoodieCleanerPlan> requestClean() {
}
final HoodieCleanerPlan cleanerPlan = requestClean(cleanerEngineContext);
Option<HoodieCleanerPlan> option = Option.empty();
- if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
- &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0) {
+ if ((cleanerPlan.getPartitionsToBeDeleted() != null &&
!cleanerPlan.getPartitionsToBeDeleted().isEmpty())
+ || (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
+ &&
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum()
> 0)) {
// Only create cleaner plan which does some work
option = Option.of(cleanerPlan);
}
+ // If cleaner plan returned an empty list, incremental clean is enabled
and there was no
+ // completed clean created in the last X hours configured in
MAX_DURATION_TO_CREATE_EMPTY_CLEAN,
+ // create a dummy clean to avoid full scan in the future.
+ // Note: For a dataset with incremental clean enabled, that does not
receive any updates, cleaner plan always comes
+ // with an empty list of files to be cleaned. CleanActionExecutor would
never be invoked for this dataset.
+ // To avoid fullscan on the dataset with every ingestion run, empty clean
commit is created here.
+ if (config.incrementalCleanerModeEnabled() &&
cleanerPlan.getEarliestInstantToRetain() != null &&
config.maxDurationToCreateEmptyCleanMs() > 0) {
+ // Only create an empty clean commit if earliestInstantToRetain is
present in the plan
+ boolean eligibleForEmptyCleanCommit = true;
+ // if there is no previous clean instant or the previous clean instant
was before the configured max duration, schedule an empty clean commit
+ Option<HoodieInstant> lastCleanInstant =
table.getCleanTimeline().lastInstant();
+ if (lastCleanInstant.isPresent()) {
+ try {
+ ZonedDateTime latestDateTime =
ZonedDateTime.ofInstant(java.time.Instant.now(),
table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
+ long currentCleanTimeMs = latestDateTime.toInstant().toEpochMilli();
+ long lastCleanTimeMs =
HoodieInstantTimeGenerator.parseDateFromInstantTime(lastCleanInstant.get().requestedTime()).toInstant().toEpochMilli();
+ eligibleForEmptyCleanCommit = currentCleanTimeMs - lastCleanTimeMs >
config.maxDurationToCreateEmptyCleanMs();
+ } catch (ParseException e) {
+ log.warn("Unable to parse last clean commit time", e);
+ }
+ }
+ if (eligibleForEmptyCleanCommit) {
+ log.warn("Creating an empty clean instant with earliestCommitToRetain
of {}", cleanerPlan.getEarliestInstantToRetain().getTimestamp());
+ return Option.of(cleanerPlan);
Review Comment:
🤖 Good catch on the ECTR backwards scenario. The fix at line 247 addresses
it, but I think there's a related issue:
`table.getCleanTimeline().lastInstant()` (line 258) returns instants in any
state — requested, inflight, or completed. If the last clean instant is
inflight, `readCleanMetadata()` at line 273 would try to deserialize a
`HoodieCleanerPlan` as `HoodieCleanMetadata`, which would fail. This should
probably use `table.getCleanTimeline().filterCompletedInstants().lastInstant()`
instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]