pankaj72981 commented on code in PR #5833:
URL: https://github.com/apache/hbase/pull/5833#discussion_r2415741629
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java:
##########
@@ -52,7 +71,21 @@ public MobFileCleanerChore(HMaster master) {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
+ threadCount =
master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
+ MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
+ if (threadCount <= 1) {
Review Comment:
Equal check not required.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java:
##########
@@ -83,29 +116,91 @@ protected void chore() {
LOG.error("MobFileCleanerChore failed", e);
return;
}
+ List<Future<?>> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
- for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
- if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
- try {
- cleaner.cleanExpiredMobFiles(htd, hcd);
- } catch (IOException e) {
- LOG.error("Failed to clean the expired mob files table={}
family={}",
- htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
- }
- }
- }
+ Future<?> future = executor.submit(() -> handleOneTable(htd));
+ futureList.add(future);
+ }
+
+ for (Future<?> future : futureList) {
try {
- // Now clean obsolete files for a table
- LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
- try (final Admin admin = master.getConnection().getAdmin()) {
-
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(),
htd.getTableName(),
- admin);
+ future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("MobFileCleanerChore interrupted while waiting for futures",
e);
+ Thread.currentThread().interrupt();
+ cancelAllFutures(futureList);
+ break;
+ } catch (ExecutionException e) {
+ LOG.error("Exception during execution of MobFileCleanerChore task", e);
+ } catch (TimeoutException e) {
+ LOG.error("MobFileCleanerChore timed out waiting for a task to
complete", e);
+ }
+ }
+ }
+
+ private void cancelAllFutures(List<Future<?>> futureList) {
+ for (Future<?> f : futureList) {
+ if (!f.isDone()) {
+ f.cancel(true); // interrupt running tasks
+ }
+ }
+ LOG.info("Cancelled all pending mob file cleaner tasks");
Review Comment:
Can we print the pending future list size?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]