This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5_beta
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5edd07f3fd72e51d787a25e068ae5672c85bdaa6
Author: Jiawei Li <1019037...@qq.com>
AuthorDate: Thu Apr 13 10:39:23 2023 +0800

    KYLIN-5635 adapt for delta table
---
 pom.xml                                            |  4 +--
 .../kylin/rest/service/SparkSourceService.java     | 10 +++++--
 .../kylin/rest/service/SparkSourceServiceTest.java | 35 ++++++++++++++++++----
 .../spark/source/NSparkMetadataExplorer.java       | 16 +++++-----
 .../spark/source/NSparkTableMetaExplorer.java      | 12 +++++---
 .../scala/org/apache/spark/sql/DdlOperation.scala  | 17 ++++++-----
 6 files changed, 64 insertions(+), 30 deletions(-)

diff --git a/pom.xml b/pom.xml
index 826f9c96e1..a53b5cf2a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,8 +123,8 @@
         <kafka.version>2.8.2</kafka.version>
 
         <!-- Spark versions -->
-        <delta.version>1.2.1</delta.version>
-        <spark.version>3.2.0-kylin-4.6.8.0</spark.version>
+        <delta.version>2.0.2</delta.version>
+        <spark.version>3.2.0-kylin-4.6.8.0-SNAPSHOT</spark.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
 
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java
index 52d23b6c21..fbf3a1387e 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java
@@ -35,6 +35,9 @@ import org.apache.kylin.common.exception.ServerErrorCode;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringHelper;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.rest.request.DDLRequest;
@@ -54,14 +57,12 @@ import org.apache.spark.sql.catalog.Database;
 import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.delta.DeltaTableUtils;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.springframework.stereotype.Service;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.guava30.shaded.common.base.Strings;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
 
 import lombok.Data;
 import lombok.val;
@@ -165,6 +166,9 @@ public class SparkSourceService extends BasicService {
         CatalogTable catalogTable = sparkSession.sessionState().catalog()
                 .getTempViewOrPermanentTableMetadata(new 
TableIdentifier(table, Option.apply(db)));
         scala.collection.immutable.List<StructField> structFieldList = 
catalogTable.schema().toList();
+        if (DeltaTableUtils.isDeltaTable(catalogTable)) {
+            structFieldList = 
sparkSession.table(catalogTable.identifier()).schema().toList();
+        }
         Iterator<StructField> structFieldIterator = structFieldList.iterator();
 
         List<ColumnModel> columnModels = Lists.newArrayList();
diff --git 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
index a2232feff1..c90ee0f8da 100644
--- 
a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
+++ 
b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java
@@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.request.DDLRequest;
@@ -42,6 +43,8 @@ import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.execution.command.DDLUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,7 +52,7 @@ import org.junit.Test;
 import org.mockito.InjectMocks;
 import org.mockito.Mockito;
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import scala.Option;
 
 public class SparkSourceServiceTest extends NLocalFileMetadataTestCase {
 
@@ -62,7 +65,10 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
     @Before
     public void setUp() throws Exception {
         createTestMetadata();
-        ss = 
SparkSession.builder().appName("local").master("local[1]").enableHiveSupport().getOrCreate();
+        ss = SparkSession.builder().appName("local").master("local[1]")
+                .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+                .config("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
+                .enableHiveSupport().getOrCreate();
         
ss.sparkContext().hadoopConfiguration().set("javax.jdo.option.ConnectionURL",
                 "jdbc:derby:memory:db;create=true");
         SparderEnv.setSparkSession(ss);
@@ -200,11 +206,14 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
     @Test
     public void testListColumns() {
         Assert.assertEquals(4, sparkSourceService.listColumns("default", 
"COUNTRY").size());
+        sparkSourceService.executeSQL(
+                "CREATE EXTERNAL TABLE delta_bigints_2(id bigint,str string) 
USING DELTA LOCATION '/tmp/delta_data_spark_2'");
+        Assert.assertEquals(2, sparkSourceService.listColumns("default", 
"delta_bigints_2").size());
 
     }
 
     @Test
-    public void testExportTables() {
+    public void testExportTables() throws Exception {
         // hive data source
         String expectedTableStructure = "CREATE EXTERNAL TABLE 
`default`.`hive_bigints`(   `id` BIGINT) "
                 + "ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' "
@@ -218,22 +227,38 @@ public class SparkSourceServiceTest extends 
NLocalFileMetadataTestCase {
                 .getTables().get("hive_bigints");
         Assert.assertEquals(actureTableStructure.substring(0, 
actureTableStructure.lastIndexOf("TBLPROPERTIES")),
                 expectedTableStructure);
-        Assert.assertTrue(DdlOperation.isHiveTable("default", "hive_bigints"));
+        CatalogTable tableMetadata = 
SparderEnv.getSparkSession().sessionState().catalog()
+                .getTableRawMetadata(new TableIdentifier("hive_bigints", 
Option.apply("default")));
+        Assert.assertTrue(DDLUtils.isHiveTable(tableMetadata));
 
         // spark datasource
         sparkSourceService.executeSQL(
                 "CREATE EXTERNAL TABLE spark_bigints(id bigint) USING PARQUET 
LOCATION '/tmp/parquet_data_spark'");
-        Assert.assertFalse(DdlOperation.isHiveTable("default", 
"spark_bigints"));
+        tableMetadata = SparderEnv.getSparkSession().sessionState().catalog()
+                .getTableRawMetadata(new TableIdentifier("spark_bigints", 
Option.apply("default")));
+        Assert.assertFalse(DDLUtils.isHiveTable(tableMetadata));
         String sparkDDL = sparkSourceService.exportTables("default", new 
String[] { "spark_bigints" }).getTables()
                 .get("spark_bigints");
         Assert.assertFalse(sparkDDL.isEmpty());
         Assert.assertTrue(StringUtils.containsIgnoreCase(sparkDDL, "USING 
PARQUET"));
 
+        // delta datasource
+        sparkSourceService.executeSQL(
+                "CREATE EXTERNAL TABLE delta_bigints(id bigint) USING DELTA 
LOCATION '/tmp/delta_data_spark'");
+        tableMetadata = SparderEnv.getSparkSession().sessionState().catalog()
+                .getTableRawMetadata(new TableIdentifier("delta_bigints", 
Option.apply("default")));
+        Assert.assertFalse(DDLUtils.isHiveTable(tableMetadata));
+        String deltaDDL = sparkSourceService.exportTables("default", new 
String[] { "delta_bigints" }).getTables()
+                .get("delta_bigints");
+        Assert.assertFalse(deltaDDL.isEmpty());
+        Assert.assertTrue(StringUtils.containsIgnoreCase(deltaDDL, "USING 
DELTA"));
+
         // view
         sparkSourceService.executeSQL("CREATE VIEW view_bigints as select id 
from default.spark_bigints");
         String viewDDL = 
DdlOperation.collectDDL(TableIdentifier.apply("view_bigints"),
                 "show create view default.view_bigints");
         Assert.assertFalse(StringUtils.isEmpty(viewDDL));
+
     }
 
     @Test
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index 20ab0c264c..31dcc1e5c5 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -41,6 +41,7 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
@@ -48,7 +49,6 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.source.ISampleDataDeployer;
 import org.apache.kylin.source.ISourceMetadataExplorer;
-
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -57,12 +57,10 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalog.Database;
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
 import org.apache.spark.sql.internal.SQLConf;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.clearspring.analytics.util.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 
 import lombok.val;
 
@@ -102,14 +100,14 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
     @Override
     public List<String> listDatabases() throws Exception {
         Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show 
databases").select("namespace");
-        List<String> databases =
-            dataset.collectAsList().stream().map(row -> 
row.getString(0)).collect(Collectors.toList());
+        List<String> databases = dataset.collectAsList().stream().map(row -> 
row.getString(0))
+                .collect(Collectors.toList());
         if (KylinConfig.getInstanceFromEnv().isDDLLogicalViewEnabled()) {
             String logicalViewDB = 
KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB();
             databases.forEach(db -> {
                 if (db.equalsIgnoreCase(logicalViewDB)) {
-                    throw new KylinException(DDL_CHECK_ERROR, "Logical view 
database should not be duplicated "
-                        + "with normal hive database!!!");
+                    throw new KylinException(DDL_CHECK_ERROR,
+                            "Logical view database should not be duplicated " 
+ "with normal hive database!!!");
                 }
             });
             List<String> databasesWithLogicalDB = Lists.newArrayList();
@@ -201,7 +199,7 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
             isAccess = false;
             try {
                 logger.error("Read hive database {} error:{}, ugi name: {}.", 
database, e.getMessage(),
-                    UserGroupInformation.getCurrentUser().getUserName());
+                        UserGroupInformation.getCurrentUser().getUserName());
             } catch (IOException ex) {
                 logger.error("fetch user curr ugi info error.", e);
             }
@@ -318,7 +316,7 @@ public class NSparkMetadataExplorer implements 
ISourceMetadataExplorer, ISampleD
     public boolean checkDatabaseAccess(String database) throws Exception {
         boolean hiveDBAccessFilterEnable = 
KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
         String viewDB = KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB();
-        if(viewDB.equalsIgnoreCase(database)){
+        if (viewDB.equalsIgnoreCase(database)) {
             return true;
         }
         if (hiveDBAccessFilterEnable) {
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
index 7f25bfde1b..332b2c20b3 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
@@ -28,18 +28,18 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.jnet.Installer;
 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.catalyst.TableIdentifier;
 import org.apache.spark.sql.catalyst.catalog.CatalogTable;
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.delta.DeltaTableUtils;
 import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import scala.Option;
 import scala.collection.JavaConversions;
 
@@ -111,7 +111,11 @@ public class NSparkTableMetaExplorer implements 
Serializable {
     private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable 
tableMetadata) {
         NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder();
         builder.setTableName(tableName);
-        builder.setAllColumns(getColumns(tableMetadata, 
tableMetadata.schema()));
+        StructType allColSchema = tableMetadata.schema();
+        if (DeltaTableUtils.isDeltaTable(tableMetadata)) {
+            allColSchema = 
SparderEnv.getSparkSession().table(tableMetadata.identifier()).schema();
+        }
+        builder.setAllColumns(getColumns(tableMetadata, allColSchema));
         builder.setOwner(tableMetadata.owner());
         builder.setCreateTime(tableMetadata.createTime() + "");
         builder.setLastAccessTime(tableMetadata.lastAccessTime() + "");
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
index f1db6006b1..125407dd10 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala
@@ -26,9 +26,11 @@ import 
org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdenti
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.{CommandExecutionMode, 
CommandResultExec, QueryExecution, SparkPlan}
 import org.apache.spark.sql.types.StructField
-
 import java.lang.{String => JString}
 import java.util.{List => JList}
+
+import org.apache.spark.sql.delta.DeltaTableUtils
+
 import scala.collection.JavaConverters._
 
 
@@ -108,7 +110,13 @@ object DdlOperation extends Logging {
 
   def getTableDesc(database: String, table: String): String = {
     var sql = s"SHOW CREATE TABLE $database.$table"
-    sql = if (isHiveTable(database, table)) sql + " AS SERDE" else sql
+    var tableMetadata = SparderEnv.getSparkSession.sessionState.catalog
+      .getTableRawMetadata(TableIdentifier(table, Some(database)))
+    if (DeltaTableUtils.isDeltaTable(tableMetadata)) {
+      return  new ShowCreateTableCommand(TableIdentifier(table, 
Some(database)), Seq.empty).
+        run(SparderEnv.getSparkSession).toList.take(1).head.getString(0);
+    }
+    sql = if (DDLUtils.isHiveTable(tableMetadata)) sql + " AS SERDE" else sql
     val logicalPlan = 
SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql)
     val queryExecution: QueryExecution = 
SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan,
       CommandExecutionMode.SKIP)
@@ -118,11 +126,6 @@ object DdlOperation extends Logging {
     }
   }
 
-  def isHiveTable(database: String, table: String): Boolean = {
-    val tableMetadata = SparderEnv.getSparkSession.sessionState.catalog
-      .getTableRawMetadata(TableIdentifier(table, Some(database)))
-    !DDLUtils.isDatasourceTable(tableMetadata)
-  }
 
   def collectDDL(tableIdentifier: TableIdentifier, sql: String): String = {
     val catalog: SessionCatalog = 
SparderEnv.getSparkSession.sessionState.catalog

Reply via email to