This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit fb02c2bf8316a0cbe721c733e4b02ad45568d86e
Author: luoping.zhang <luoping.zh...@kyligence.io>
AuthorDate: Sun Oct 8 17:34:46 2023 +0800

    KYLIN-5854 In the R/W cluster, refreshed the snapshots automatically
---
 .../SnapshotSourceTableStatsServiceTest.scala       |  8 ++++----
 .../service/SnapshotSourceTableStatsService.java    | 21 ++++++++++++++-------
 2 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
 
b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
index 5c841c02e9..4bba544cdf 100644
--- 
a/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
+++ 
b/src/query-server/src/test/scala/org/apache/kylin/rest/service/SnapshotSourceTableStatsServiceTest.scala
@@ -203,7 +203,7 @@ class SnapshotSourceTableStatsServiceTest extends 
SparderBaseFunSuite with Local
       init(tableName, table.database) {
         val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT)
         val locationPath = table.location.getPath
-        val locationFilesStatus: util.List[FileStatus] = 
snapshotSourceTableStatsService.getLocationFileStatus(locationPath)
+        val locationFilesStatus: util.List[FileStatus] = 
snapshotSourceTableStatsService.getLocationFileStatus(locationPath, config)
         val snapshotTablesLocationsJson = 
snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, 
config,
           locationFilesStatus)
         snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, 
tableIdentity, snapshotTablesLocationsJson)
@@ -233,7 +233,7 @@ class SnapshotSourceTableStatsServiceTest extends 
SparderBaseFunSuite with Local
         try {
           val tableIdentity = table.qualifiedName.toLowerCase(Locale.ROOT)
           val locationPath = table.location.getPath
-          val locationFilesStatus: util.List[FileStatus] = 
snapshotSourceTableStatsService.getLocationFileStatus(locationPath)
+          val locationFilesStatus: util.List[FileStatus] = 
snapshotSourceTableStatsService.getLocationFileStatus(locationPath, config)
           val snapshotTablesLocationsJson = 
snapshotSourceTableStatsService.createSnapshotSourceTableStats(locationPath, 
config,
             locationFilesStatus)
           
snapshotSourceTableStatsService.writeSourceTableStats(DEFAULT_PROJECT, 
tableIdentity, snapshotTablesLocationsJson)
@@ -319,7 +319,7 @@ class SnapshotSourceTableStatsServiceTest extends 
SparderBaseFunSuite with Local
         val needCheckPartitions = partitions.asScala.sortBy(partition => 
partition.createTime).reverse
           .slice(0, config.getSnapshotAutoRefreshFetchPartitionsCount).asJava
 
-        
snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions,
 needSavePartitionsFilesStatus)
+        
snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions,
 needSavePartitionsFilesStatus, config)
         for (partition <- partitions.asScala) {
           
snapshotSourceTableStatsService.createPartitionSnapshotSourceTableStats(partition,
 needSavePartitionsFilesStatus,
             snapshotTablesLocationsJson, config)
@@ -369,7 +369,7 @@ class SnapshotSourceTableStatsServiceTest extends 
SparderBaseFunSuite with Local
           val needCheckPartitions = partitions.asScala.sortBy(partition => 
partition.createTime).reverse
             .slice(0, config.getSnapshotAutoRefreshFetchPartitionsCount).asJava
 
-          
snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions,
 needSavePartitionsFilesStatus)
+          
snapshotSourceTableStatsService.putNeedSavePartitionsFilesStatus(needCheckPartitions,
 needSavePartitionsFilesStatus, config)
           for (partition <- partitions.asScala) {
             
snapshotSourceTableStatsService.createPartitionSnapshotSourceTableStats(partition,
 needSavePartitionsFilesStatus,
               snapshotTablesLocationsJson, config)
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
index b67c725d28..267baa81a6 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/SnapshotSourceTableStatsService.java
@@ -203,7 +203,7 @@ public class SnapshotSourceTableStatsService extends 
BasicService {
             val table = tableCatalog.loadTable(identifier);
             var location = table.properties().get("location");
             if (tableCatalog.getClass().toString().contains("iceberg"))
-                location = location + "/data";
+                location = location + "/metadata";
             return checkTableLocation(project, location, projectConfig, 
catalogName + "." + identifier.toString());
         }
         throw new KylinRuntimeException("unsupported catalog:" + catalog);
@@ -280,7 +280,7 @@ public class SnapshotSourceTableStatsService extends 
BasicService {
     public boolean checkLocation(String location, List<FileStatus> filesStatus,
             Map<String, SnapshotSourceTableStats> 
snapshotSourceTableStatsJson, KylinConfig config) throws IOException {
         log.info("check table/partition location: {}", location);
-        filesStatus.addAll(getLocationFileStatus(location));
+        filesStatus.addAll(getLocationFileStatus(location, config));
         // check file count
         val sourceTableStats = snapshotSourceTableStatsJson.get(location);
         if (sourceTableStats == null) {
@@ -386,7 +386,7 @@ public class SnapshotSourceTableStatsService extends 
BasicService {
         val needCheckPartitions = partitions.stream()
                 .sorted((ctp1, ctp2) -> Long.compare(ctp2.createTime(), 
ctp1.createTime()))
                 
.limit(config.getSnapshotAutoRefreshFetchPartitionsCount()).collect(Collectors.toList());
-        putNeedSavePartitionsFilesStatus(needCheckPartitions, 
needSavePartitionsFilesStatus);
+        putNeedSavePartitionsFilesStatus(needCheckPartitions, 
needSavePartitionsFilesStatus, config);
 
         // check partition count
         if (partitions.size() != snapshotSourceTableStatsJson.size()) {
@@ -421,16 +421,23 @@ public class SnapshotSourceTableStatsService extends 
BasicService {
     }
 
     public void putNeedSavePartitionsFilesStatus(List<CatalogTablePartition> 
partitions,
-            Map<String, List<FileStatus>> locationsFileStatusMap) throws 
IOException {
+            Map<String, List<FileStatus>> locationsFileStatusMap, KylinConfig 
config) throws IOException {
         for (CatalogTablePartition partition : partitions) {
-            val filesStatus = 
getLocationFileStatus(partition.location().getPath());
+            val filesStatus = 
getLocationFileStatus(partition.location().getPath(), config);
             locationsFileStatusMap.put(partition.location().getPath(), 
filesStatus);
         }
     }
 
-    public List<FileStatus> getLocationFileStatus(String location) throws 
IOException {
+    public List<FileStatus> getLocationFileStatus(String location, KylinConfig 
config) throws IOException {
+        var fileSystem = 
StringUtils.isBlank(config.getWriteClusterWorkingDir()) ? 
HadoopUtil.getWorkingFileSystem()
+                : HadoopUtil.getWriteClusterFileSystem();
+
         val sourceTableStatsPath = new Path(location);
-        val fileSystem = 
sourceTableStatsPath.getFileSystem(SparderEnv.getHadoopConfiguration());
+        val pathSchema = sourceTableStatsPath.toUri().getScheme();
+        val fileSchema = fileSystem.getUri().getScheme();
+        if (pathSchema != null && !pathSchema.equalsIgnoreCase(fileSchema)) {
+            fileSystem = 
sourceTableStatsPath.getFileSystem(SparderEnv.getHadoopConfiguration());
+        }
         if (!fileSystem.exists(sourceTableStatsPath)) {
             return Collections.emptyList();
         }

Reply via email to