This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit fb02c2bf8316a0cbe721c733e4b02ad45568d86e Author: luoping.zhang <luoping.zh...@kyligence.io> AuthorDate: Sun Oct 8 17:34:46 2023 +0800 KYLIN-5854 In the R/W cluster, refreshed the snapshots automatically --- .../SnapshotSourceTableStatsServiceTest.scala | 8 ++++---- .../service/SnapshotSourceTableStatsService.java | 21 ++++++++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala index 5c841c02e9..4bba544cdf 100644 --- a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala +++ b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala @@ -203,7 +203,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local init(tableName, table.database) { val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT) val locationPath = table.location.getPath - val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath) + val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath, config) val snapshotTablesLocationsJson = snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config, locationFilesStatus) snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, tableIdentity, snapshotTablesLocationsJson) @@ -233,7 +233,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local try { val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT) val locationPath = table.location.getPath - val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath) + val locationFilesStatus: util.List[FileStatus] = snapshotSourceTableStatsService.getLocationFileStatus(locationPath, config) val snapshotTablesLocationsJson = snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, config, locationFilesStatus) snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, tableIdentity, snapshotTablesLocationsJson) @@ -319,7 +319,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local val needCheckPartitions = partitions.asScala.sortBy(partition => partition.createTime).reverse .slice(0, config.getSnapshotAutoRefreshFetchPartitionsCount).asJava - snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus) + snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus, config) for (partition <- partitions.asScala) { snapshotSourceTableStatsService.createPartitionSnapshotSourceTableStats(partition, needSavePartitionsFilesStatus, snapshotTablesLocationsJson, config) @@ -369,7 +369,7 @@ class SnapshotSourceTableStatsServiceTest extends SparderBaseFunSuite with Local val needCheckPartitions = partitions.asScala.sortBy(partition => partition.createTime).reverse .slice(0, config.getSnapshotAutoRefreshFetchPartitionsCount).asJava - snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus) + snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus, config) for (partition <- partitions.asScala) { snapshotSourceTableStatsService.createPartitionSnapshotSourceTableStats(partition, needSavePartitionsFilesStatus, snapshotTablesLocationsJson, config) diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java index b67c725d28..267baa81a6 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java @@ -203,7 +203,7 @@ public class SnapshotSourceTableStatsService extends BasicService { val table = tableCatalog.loadTable(identifier); var location = table.properties().get("location"); if (tableCatalog.getClass().toString().contains("iceberg")) - location = location + "/data"; + location = location + "/metadata"; return checkTableLocation(project, location, projectConfig, catalogName + "." + identifier.toString()); } throw new KylinRuntimeException("unsupported catalog:" + catalog); @@ -280,7 +280,7 @@ public class SnapshotSourceTableStatsService extends BasicService { public boolean checkLocation(String location, List<FileStatus> filesStatus, Map<String, SnapshotSourceTableStats> snapshotSourceTableStatsJson, KylinConfig config) throws IOException { log.info("check table/partition location: {}", location); - filesStatus.addAll(getLocationFileStatus(location)); + filesStatus.addAll(getLocationFileStatus(location, config)); // check file count val sourceTableStats = snapshotSourceTableStatsJson.get(location); if (sourceTableStats == null) { @@ -386,7 +386,7 @@ public class SnapshotSourceTableStatsService extends BasicService { val needCheckPartitions = partitions.stream() .sorted((ctp1, ctp2) -> Long.compare(ctp2.createTime(), ctp1.createTime())) .limit(config.getSnapshotAutoRefreshFetchPartitionsCount()).collect(Collectors.toList()); - putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus); + putNeedSavePartitionsFilesStatus(needCheckPartitions, needSavePartitionsFilesStatus, config); // check partition count if (partitions.size() != snapshotSourceTableStatsJson.size()) { @@ -421,16 +421,23 @@ public class SnapshotSourceTableStatsService extends BasicService { } public void putNeedSavePartitionsFilesStatus(List<CatalogTablePartition> partitions, - Map<String, List<FileStatus>> locationsFileStatusMap) throws IOException { + Map<String, List<FileStatus>> locationsFileStatusMap, KylinConfig config) throws IOException { for (CatalogTablePartition partition : partitions) { - val filesStatus = getLocationFileStatus(partition.location().getPath()); + val filesStatus = getLocationFileStatus(partition.location().getPath(), config); locationsFileStatusMap.put(partition.location().getPath(), filesStatus); } } - public List<FileStatus> getLocationFileStatus(String location) throws IOException { + public List<FileStatus> getLocationFileStatus(String location, KylinConfig config) throws IOException { + var fileSystem = StringUtils.isBlank(config.getWriteClusterWorkingDir()) ? HadoopUtil.getWorkingFileSystem() + : HadoopUtil.getWriteClusterFileSystem(); + val sourceTableStatsPath = new Path(location); - val fileSystem = sourceTableStatsPath.getFileSystem(SparderEnv.getHadoopConfiguration()); + val pathSchema = sourceTableStatsPath.toUri().getScheme(); + val fileSchema = fileSystem.getUri().getScheme(); + if (pathSchema != null && !pathSchema.equalsIgnoreCase(fileSchema)) { + fileSystem = sourceTableStatsPath.getFileSystem(SparderEnv.getHadoopConfiguration()); + } if (!fileSystem.exists(sourceTableStatsPath)) { return Collections.emptyList(); }