This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2e4167126c04b6eec2948d30d328be2af80fc8ed
Author: ChenLiang <31469905+yab...@users.noreply.github.com>
AuthorDate: Thu Dec 8 17:03:34 2022 +0800

    KYLIN-5439 fix read/write cluster db location error
---
 .../apache/kylin/rest/service/SparkDDLTest.java    | 11 ++++++++-
 .../spark/source/NSparkMetadataExplorer.java       | 26 +++++++++++++---------
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
index fbdb21528f..d3a1ceba42 100644
--- 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
+++ 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
@@ -21,11 +21,14 @@ import java.util.List;
 
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.engine.spark.source.NSparkMetadataExplorer;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.common.SparkDDLTestUtils;
+import org.apache.spark.sql.internal.SQLConf;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -79,7 +82,7 @@ public class SparkDDLTest extends NLocalFileMetadataTestCase {
   }
 
   @Test
-  public void testDDL() {
+  public void testDDL() throws Exception {
     try {
       assertKylinExeption(
           () ->
@@ -127,6 +130,12 @@ public class SparkDDLTest extends 
NLocalFileMetadataTestCase {
       // ddl description
       List<List<String>> description = ddlService.pluginsDescription("ssb");
       Assert.assertTrue(description.size() > 0);
+
+
+      // read/write cluster
+      SparderEnv.getSparkSession().sessionState().conf()
+          .setConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION(), "hdfs://read");
+      new NSparkMetadataExplorer().checkDatabaseHadoopAccessFast("SSB");
     } finally {
       SparkSession spark = SparderEnv.getSparkSession();
       if (spark != null && !spark.sparkContext().isStopped()) {
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index b99b9727f7..64e65d7c73 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -172,10 +172,10 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
         val spark = SparderEnv.getSparkSession();
         try {
             String databaseLocation = 
spark.catalog().getDatabase(database).locationUri();
-            RemoteIterator<FileStatus> tablesIterator = 
getFilesIterator(databaseLocation);
+            RemoteIterator<FileStatus> tablesIterator = 
getFilesIterator(databaseLocation, false);
             if (tablesIterator.hasNext()) {
                 Path tablePath = tablesIterator.next().getPath();
-                getFilesIterator(tablePath.toString());
+                getFilesIterator(tablePath.toString(), true);
             }
         } catch (Exception e) {
             isAccess = false;
@@ -189,17 +189,21 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
         return isAccess;
     }
 
-    private RemoteIterator<FileStatus> getFilesIterator(String location) 
throws IOException {
-        String hiveSpecFsLocation = 
SparderEnv.getSparkSession().sessionState().conf()
-            .getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
-        FileSystem fs = null == hiveSpecFsLocation ? 
HadoopUtil.getWorkingFileSystem()
-            : HadoopUtil.getFileSystem(hiveSpecFsLocation);
-        if (location.startsWith(fs.getScheme()) || location.startsWith("/")) {
-            fs.listStatus(new Path(location));
-            return fs.listStatusIterator(new Path(location));
+    private RemoteIterator<FileStatus> getFilesIterator(String location, 
boolean checkList) throws IOException {
+        val sparkConf = SparderEnv.getSparkSession().sessionState().conf();
+        String hiveSpecFsLocation;
+        FileSystem fs;
+        if (sparkConf.contains("spark.sql.hive.specific.fs.location")) {
+            hiveSpecFsLocation = 
sparkConf.getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
+            location = location.replace("hdfs://hacluster", 
hiveSpecFsLocation);
+            fs = HadoopUtil.getFileSystem(hiveSpecFsLocation);
         } else {
-            return HadoopUtil.getFileSystem(location).listStatusIterator(new 
Path(location));
+            fs = HadoopUtil.getFileSystem(location);
+        }
+        if (checkList) {
+            fs.listStatus(new Path(location));
         }
+        return fs.listStatusIterator(new Path(location));
     }
 
     @Override

Reply via email to