This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2e4167126c04b6eec2948d30d328be2af80fc8ed Author: ChenLiang <31469905+yab...@users.noreply.github.com> AuthorDate: Thu Dec 8 17:03:34 2022 +0800 KYLIN-5439 fix read/write cluster db location error --- .../apache/kylin/rest/service/SparkDDLTest.java | 11 ++++++++- .../spark/source/NSparkMetadataExplorer.java | 26 +++++++++++++--------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java index fbdb21528f..d3a1ceba42 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java @@ -21,11 +21,14 @@ import java.util.List; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.engine.spark.source.NSparkMetadataExplorer; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.rest.constant.Constant; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.common.SparkDDLTestUtils; +import org.apache.spark.sql.internal.SQLConf; + import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -79,7 +82,7 @@ public class SparkDDLTest extends NLocalFileMetadataTestCase { } @Test - public void testDDL() { + public void testDDL() throws Exception { try { assertKylinExeption( () -> @@ -127,6 +130,12 @@ public class SparkDDLTest extends NLocalFileMetadataTestCase { // ddl description List<List<String>> description = ddlService.pluginsDescription("ssb"); Assert.assertTrue(description.size() > 0); + + + // read/write cluster + SparderEnv.getSparkSession().sessionState().conf() + .setConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION(), "hdfs://read"); + new NSparkMetadataExplorer().checkDatabaseHadoopAccessFast("SSB"); } finally { SparkSession spark = SparderEnv.getSparkSession(); if (spark != null && !spark.sparkContext().isStopped()) { diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java index b99b9727f7..64e65d7c73 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -172,10 +172,10 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD val spark = SparderEnv.getSparkSession(); try { String databaseLocation = spark.catalog().getDatabase(database).locationUri(); - RemoteIterator<FileStatus> tablesIterator = getFilesIterator(databaseLocation); + RemoteIterator<FileStatus> tablesIterator = getFilesIterator(databaseLocation, false); if (tablesIterator.hasNext()) { Path tablePath = tablesIterator.next().getPath(); - getFilesIterator(tablePath.toString()); + getFilesIterator(tablePath.toString(), true); } } catch (Exception e) { isAccess = false; @@ -189,17 +189,21 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD return isAccess; } - private RemoteIterator<FileStatus> getFilesIterator(String location) throws IOException { - String hiveSpecFsLocation = SparderEnv.getSparkSession().sessionState().conf() - .getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION()); - FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem() - : HadoopUtil.getFileSystem(hiveSpecFsLocation); - if (location.startsWith(fs.getScheme()) || location.startsWith("/")) { - fs.listStatus(new Path(location)); - return fs.listStatusIterator(new Path(location)); + private RemoteIterator<FileStatus> getFilesIterator(String location, boolean checkList) throws IOException { + val sparkConf = SparderEnv.getSparkSession().sessionState().conf(); + String hiveSpecFsLocation; + FileSystem fs; + if (sparkConf.contains("spark.sql.hive.specific.fs.location")) { + hiveSpecFsLocation = sparkConf.getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION()); + location = location.replace("hdfs://hacluster", hiveSpecFsLocation); + fs = HadoopUtil.getFileSystem(hiveSpecFsLocation); } else { - return HadoopUtil.getFileSystem(location).listStatusIterator(new Path(location)); + fs = HadoopUtil.getFileSystem(location); + } + if (checkList) { + fs.listStatus(new Path(location)); } + return fs.listStatusIterator(new Path(location)); } @Override