This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new a7b7edb Add option to remove stale files under job_tmp for StorageCleanupJob (#1732) a7b7edb is described below commit a7b7edba90e8810bad3b9a76cbf64241da8de4b5 Author: Xiaoxiang Yu <x...@apache.org> AuthorDate: Wed Aug 18 12:52:13 2021 +0800 Add option to remove stale files under job_tmp for StorageCleanupJob (#1732) * Add option to remove stale files under job_tmp for StorageCleanupJob * fix code style * fix cleanupJobTmp * fix sample.sh * fix kylin.engine.build-base-cuboid-enabled conflict with cube planner * minor fix * Fix optimize job * fix typo Co-authored-by: yaqian.zhang <598593...@qq.com> --- .../engine/mr/common/CuboidRecommenderUtil.java | 13 +- build/conf/spark-driver-log4j.properties | 6 + .../kylin/common/util/CliCommandExecutor.java | 2 +- .../src/main/resources/kylin-defaults.properties | 1 + kylin-it/DEPRECATED_MODULE | 4 +- .../engine/spark/utils/UpdateMetadataUtil.java | 42 +++---- .../apache/kylin/rest/job/StorageCleanupJob.java | 131 +++++++++++++++------ .../kylin/rest/job/StorageCleanupJobTest.java | 2 +- 8 files changed, 134 insertions(+), 67 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java index f6ae332..71d2fd1 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -51,8 +51,9 @@ public class CuboidRecommenderUtil { } CubeInstance cube = segment.getCubeInstance(); long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); - if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null - || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) + && segment.getConfig().isBuildBaseCuboid()) { logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO); return null; } @@ -77,7 +78,8 @@ public class CuboidRecommenderUtil { Pair<Map<Long, Long>, Map<Long, Double>> statsPair = CuboidStatsReaderUtil .readCuboidStatsAndSizeFromCube(currentCuboids, cube); long baseCuboid = cuboidScheduler.getBaseCuboidId(); - if (statsPair.getFirst().get(baseCuboid) == null || statsPair.getFirst().get(baseCuboid) == 0L) { + if ((statsPair.getFirst().get(baseCuboid) == null || statsPair.getFirst().get(baseCuboid) == 0L) + && cube.getConfig().isBuildBaseCuboid()) { logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO); return null; } @@ -121,8 +123,9 @@ public class CuboidRecommenderUtil { } CubeInstance cube = segment.getCubeInstance(); long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); - if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null - || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) { + if ((cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null + || cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) + && segment.getConfig().isBuildBaseCuboid()) { logger.info(BASE_CUBOID_COUNT_IN_CUBOID_STATISTICS_IS_ZERO); return null; } diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties index 04c648c..364a6ed 100644 --- a/build/conf/spark-driver-log4j.properties +++ b/build/conf/spark-driver-log4j.properties @@ -22,6 +22,11 @@ log4j.logger.org.apache.kylin=DEBUG log4j.logger.org.springframework=WARN log4j.logger.org.apache.spark=WARN +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n + # hdfs file appender log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled} @@ -36,6 +41,7 @@ log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayou #Don't add line number (%L) as it's too costly! log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n +## Log saved under $KYLIN_HOME/logs/spark log4j.appender.logFile=org.apache.log4j.FileAppender log4j.appender.logFile.Threshold=DEBUG log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java index 23089b0..54bab60 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java @@ -152,7 +152,7 @@ public class CliCommandExecutor { } if (Thread.interrupted()) { - logger.info("CliCommandExecutor is interruppted by other, kill the sub process: " + command); + logger.info("CliCommandExecutor is interrupted by other, kill the sub process: " + command); proc.destroy(); try { Thread.sleep(1000); diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 384466f..badfa69 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -83,6 +83,7 @@ kylin.web.link-hadoop= kylin.web.link-diagnostic= kylin.web.contact-mail= kylin.server.external-acl-provider= +kylin.source.hive.database-for-flat-table=default # Default time filter for job list, 0->current day, 1->last one day, 2->last one week, 3->last one year, 4->all kylin.web.default-time-filter=1 diff --git a/kylin-it/DEPRECATED_MODULE b/kylin-it/DEPRECATED_MODULE index b593cf2..e3c19f4 100644 --- a/kylin-it/DEPRECATED_MODULE +++ b/kylin-it/DEPRECATED_MODULE @@ -16,4 +16,6 @@ # limitations under the License. # -This maven module is deprecated and maybe be removed in the future. \ No newline at end of file +This maven module is deprecated and maybe be removed in the future. + +Remind that the path 'kylin-it/src/test/resources/query' is still be used by module 'kylin-spark-project/kylin-spark-test' . \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java index a02530e..6921444 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -113,30 +113,30 @@ public class UpdateMetadataUtil { CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, config); Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); if (origSegStatsReader.getCuboidRowHLLCounters() == null) { - throw new IllegalArgumentException( + logger.warn( "Cuboid statistics of original segment do not exist. Please check the config of kylin.engine.segment-statistics-enabled."); - } - addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap); - addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap); - - Set<Long> recommendCuboids = currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND); - Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(recommendCuboids.size()); - for (long cuboid : recommendCuboids) { - HLLCounter hll = cuboidHLLMap.get(cuboid); - if (hll == null) { - logger.warn("Cannot get the row count stats for cuboid " + cuboid); - } else { - resultCuboidHLLMap.put(cuboid, hll); + } else { + addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap); + addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap); + + Set<Long> recommendCuboids = currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND); + Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(recommendCuboids.size()); + for (long cuboid : recommendCuboids) { + HLLCounter hll = cuboidHLLMap.get(cuboid); + if (hll == null) { + logger.warn("Cannot get the row count stats for cuboid " + cuboid); + } else { + resultCuboidHLLMap.put(cuboid, hll); + } } + if (fs.exists(statisticsFile)) { + fs.delete(statisticsFile, false); + } + CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new Path(statisticsDir), + resultCuboidHLLMap, 1, origSegStatsReader.getSourceRowCount()); + FSDataInputStream is = fs.open(statisticsFile); + ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis()); } - if (fs.exists(statisticsFile)) { - fs.delete(statisticsFile, false); - } - CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new Path(statisticsDir), - resultCuboidHLLMap, 1, origSegStatsReader.getSourceRowCount()); - FSDataInputStream is = fs.open(statisticsFile); - ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis()); - toUpdateSeg.setStatus(SegmentStatusEnum.READY_PENDING); } else { toUpdateSeg.setStatus(SegmentStatusEnum.READY); diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index f4ad269..95a7a48 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -43,43 +43,75 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.stream.Collectors; +/** + * Please update https://cwiki.apache.org/confluence/display/KYLIN/How+to+clean+up+storage+in+Kylin+4 + * if you change this class. + */ public class StorageCleanupJob extends AbstractApplication { private static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class); + /** + * It is considered quite safe to remove job_tmp path which was created 1 week ago . + */ + public static final int DEFAULT_CLEANUP_HOUR_THRESHOLD = 24 * 7; + public static final boolean DEFAULT_CLEANUP_DICT = true; + public static final boolean DEFAULT_CLEANUP_SNAPSHOT = true; + public static final boolean DEFAULT_CLEANUP_JOB_TMP = false; + public static final boolean DEFAULT_CLEANUP = false; + private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/"; + private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/"; + @SuppressWarnings("static-access") - protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false) - .withDescription("Delete the unused storage").create("delete"); + protected static final Option OPTION_DELETE = OptionBuilder.withArgName("delete") + .hasArg().isRequired(false) + .withType(Boolean.class.getName()) + .withDescription("Boolean, whether or not to do real delete operation. Default value is " + DEFAULT_CLEANUP + ", means a dry run.") + .create("delete"); + @SuppressWarnings("static-access") protected static final Option OPTION_CLEANUP_TABLE_SNAPSHOT = OptionBuilder.withArgName("cleanupTableSnapshot") - .hasArg().isRequired(false).withDescription("Delete the unused storage").create("cleanupTableSnapshot"); - @SuppressWarnings("static-access") - protected static final Option OPTION_CLEANUP_GLOBAL_DICT = OptionBuilder.withArgName("cleanupGlobalDict").hasArg() - .isRequired(false).withDescription("Delete the unused storage").create("cleanupGlobalDict"); + .hasArg().isRequired(false) + .withType(Boolean.class.getName()) + .withDescription("Boolean, whether or not to delete unreferenced snapshot files. Default value is " + DEFAULT_CLEANUP_SNAPSHOT + " .") + .create("cleanupTableSnapshot"); + @SuppressWarnings("static-access") - protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR = OptionBuilder.withArgName("cleanupThreshold").hasArg() - .isRequired(false).withDescription("Delete unused storage that have not been modified in how many hours") - .create("cleanupThreshold"); + protected static final Option OPTION_CLEANUP_GLOBAL_DICT = OptionBuilder.withArgName("cleanupGlobalDict") + .hasArg().isRequired(false) + .withType(Boolean.class.getName()) + .withDescription("Boolean, whether or not to delete unreferenced global dict files. Default value is " + DEFAULT_CLEANUP_DICT + " .") + .create("cleanupGlobalDict"); - private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/"; - private static final String TABLE_SNAPSHOT_PREFIX = "/table_snapshot/"; + @SuppressWarnings("static-access") + protected static final Option OPTION_CLEANUP_JOB_TMP = OptionBuilder.withArgName("cleanupJobTmp") + .hasArg().isRequired(false) + .withType(Boolean.class.getName()) + .withDescription("Boolean, whether or not to delete job tmp files. Default value is " + DEFAULT_CLEANUP_JOB_TMP + " .") + .create("cleanupJobTmp"); - private static final String TABLE_SNAPSHOT = "table snapshot"; - private static final String GLOBAL_DICTIONARY = "global dictionary"; - private static final String SEGMENT_PARQUET_FILE = "segment parquet file"; + @SuppressWarnings("static-access") + protected static final Option OPTION_CLEANUP_THRESHOLD_HOUR = OptionBuilder.withArgName("cleanupThreshold") + .hasArg().isRequired(false) + .withType(Integer.class.getName()) + .withDescription( + "Integer, used to specific delete unreferenced storage that have not been modified before how many hours (recent files are protected). " + + "Default value is " + DEFAULT_CLEANUP_HOUR_THRESHOLD + " hours.") + .create("cleanupThreshold"); final protected KylinConfig config; final protected FileSystem fs; final protected ExecutableManager executableManager; - protected boolean delete = false; - protected boolean cleanupTableSnapshot = true; - protected boolean cleanupGlobalDict = true; - protected int cleanupThreshold = 12; // 12 hour - + protected boolean delete = DEFAULT_CLEANUP; + protected boolean cleanupTableSnapshot = DEFAULT_CLEANUP_SNAPSHOT; + protected boolean cleanupGlobalDict = DEFAULT_CLEANUP_DICT; + protected boolean cleanupJobTmp = DEFAULT_CLEANUP; + protected int cleanupThreshold = DEFAULT_CLEANUP_HOUR_THRESHOLD; protected long storageTimeCut; protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs"); @@ -101,6 +133,7 @@ public class StorageCleanupJob extends AbstractApplication { options.addOption(OPTION_DELETE); options.addOption(OPTION_CLEANUP_GLOBAL_DICT); options.addOption(OPTION_CLEANUP_TABLE_SNAPSHOT); + options.addOption(OPTION_CLEANUP_JOB_TMP); options.addOption(OPTION_CLEANUP_THRESHOLD_HOUR); return options; } @@ -108,13 +141,6 @@ public class StorageCleanupJob extends AbstractApplication { @Override protected void execute(OptionsHelper optionsHelper) throws Exception { logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); - logger.info("delete option value: '" + optionsHelper.getOptionValue(OPTION_DELETE) + "'"); - logger.info("cleanup table snapshot option value: '" - + optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT) + "'"); - logger.info( - "delete global dict option value: '" + optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT) + "'"); - logger.info("delete unused storage that have not been modified in how many hours option value: '" - + optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR) + "'"); delete = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DELETE)); if (optionsHelper.hasOption(OPTION_CLEANUP_TABLE_SNAPSHOT)) { cleanupTableSnapshot = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_TABLE_SNAPSHOT)); @@ -122,11 +148,18 @@ public class StorageCleanupJob extends AbstractApplication { if (optionsHelper.hasOption(OPTION_CLEANUP_GLOBAL_DICT)) { cleanupGlobalDict = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_GLOBAL_DICT)); } + if (optionsHelper.hasOption(OPTION_CLEANUP_JOB_TMP)) { + cleanupJobTmp = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_CLEANUP_JOB_TMP)); + } if (optionsHelper.hasOption(OPTION_CLEANUP_THRESHOLD_HOUR)) { cleanupThreshold = Integer.parseInt(optionsHelper.getOptionValue(OPTION_CLEANUP_THRESHOLD_HOUR)); } storageTimeCut = System.currentTimeMillis() - cleanupThreshold * 3600 * 1000L; + Date cleanBeforeDate = new Date(storageTimeCut); + logger.info("===================================================================\n" + + "delete : {}; cleanupTableSnapshot : {}; cleanupGlobalDict : {}; cleanupJobTmp : {}; cleanBeforeDate : {}." + , delete, cleanupTableSnapshot, cleanupGlobalDict, cleanupJobTmp, cleanBeforeDate); cleanup(); } @@ -138,7 +171,7 @@ public class StorageCleanupJob extends AbstractApplication { List<String> projects = projectManager.listAllProjects().stream().map(ProjectInstance::getName) .collect(Collectors.toList()); - //clean up deleted projects and cubes + logger.info("Start to clean up unreferenced projects and cubes ..."); List<CubeInstance> cubes = cubeManager.listAllCubes(); Path metadataPath = new Path(config.getHdfsWorkingDirectory()); if (fs.exists(metadataPath)) { @@ -148,7 +181,7 @@ public class StorageCleanupJob extends AbstractApplication { if (eligibleStorage(status)) { String projectName = status.getPath().getName(); if (!projects.contains(projectName)) { - cleanupStorage(status.getPath(), SEGMENT_PARQUET_FILE); + deleteOp(status.getPath(), StorageCleanType.PROJECT_DIR); } else { cleanupGlobalDict(projectName, cubes.stream().filter(cube -> projectName.equals(cube.getProject())) @@ -164,14 +197,14 @@ public class StorageCleanupJob extends AbstractApplication { } } - //clean up no used segments + logger.info("Start to clean up no unreferenced segments ..."); for (CubeInstance cube : cubes) { List<String> segments = cube.getSegments().stream().map(segment -> { return segment.getName() + "_" + segment.getStorageLocationIdentifier(); }).collect(Collectors.toList()); String project = cube.getProject(); - //list all segment directory + // list all segment directory Path cubePath = new Path(config.getHdfsWorkingDirectory(project) + "/parquet/" + cube.getName()); if (fs.exists(cubePath)) { FileStatus[] segmentStatus = fs.listStatus(cubePath); @@ -180,13 +213,26 @@ public class StorageCleanupJob extends AbstractApplication { if (eligibleStorage(status)) { String segment = status.getPath().getName(); if (!segments.contains(segment)) { - cleanupStorage(status.getPath(), SEGMENT_PARQUET_FILE); + deleteOp(status.getPath(), StorageCleanType.SEGMENT_DIR); } } } } } else { - logger.warn("Cube path doesn't exist! The path is " + cubePath); + logger.warn("Cube path doesn't exist! The path is {}", cubePath); + } + } + + if (cleanupJobTmp) { + logger.info("Start to clean up stale job_tmp ..."); + for (String prj : projects) { + Path prjPath = new Path(config.getJobTmpDir(prj)); + FileStatus[] jobTmpPaths = fs.listStatus(prjPath); + for (FileStatus status : jobTmpPaths) { + if (eligibleStorage(status)) { + deleteOp(status.getPath(), StorageCleanType.JOB_TMP); + } + } } } } @@ -201,7 +247,7 @@ public class StorageCleanupJob extends AbstractApplication { if (eligibleStorage(status)) { String cubeName = status.getPath().getName(); if (!cubes.contains(cubeName)) { - cleanupStorage(status.getPath(), SEGMENT_PARQUET_FILE); + deleteOp(status.getPath(), StorageCleanType.CUBE_DIR); } } } @@ -237,7 +283,7 @@ public class StorageCleanupJob extends AbstractApplication { } for (Path path : toDeleteSnapshot) { - cleanupStorage(path, TABLE_SNAPSHOT); + deleteOp(path, StorageCleanType.TABLE_SNAPSHOT); } } @@ -279,16 +325,16 @@ public class StorageCleanupJob extends AbstractApplication { } for (Path path : toDeleteDict) { - cleanupStorage(path, GLOBAL_DICTIONARY); + deleteOp(path, StorageCleanType.GLOBAL_DICTIONARY); } } - private void cleanupStorage(Path path, String storageType) throws IOException { + private void deleteOp(Path path, StorageCleanType type) throws IOException { if (delete) { - logger.info("Deleting unused {}, {}", storageType, path); + logger.info("Deleting unreferenced {}, {}", type, path); fs.delete(path, true); } else { - logger.info("Dry run, pending delete unused {}, {}", storageType, path); + logger.info("Dry run, pending delete unreferenced path {}, {}", type, path); } } @@ -296,3 +342,12 @@ public class StorageCleanupJob extends AbstractApplication { return status != null && status.getModificationTime() < storageTimeCut; } } + +enum StorageCleanType { + PROJECT_DIR, + GLOBAL_DICTIONARY, + TABLE_SNAPSHOT, + CUBE_DIR, + SEGMENT_DIR, + JOB_TMP +} diff --git a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java index 9f1f62d..5121da3 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java @@ -64,7 +64,7 @@ public class StorageCleanupJobTest { prepareHDFSFiles(basePath, mockFs); StorageCleanupJob job = new StorageCleanupJob(kylinConfig, mockFs); - job.execute(new String[] { "--delete", "true" }); + job.execute(new String[] { "--delete", "true", "--cleanupThreshold", "12" }); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); verify(mockFs, times(6)).delete(pathCaptor.capture(), eq(true));