This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5_beta in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5edd07f3fd72e51d787a25e068ae5672c85bdaa6 Author: Jiawei Li <1019037...@qq.com> AuthorDate: Thu Apr 13 10:39:23 2023 +0800 KYLIN-5635 adapt for delta table --- pom.xml | 4 +-- .../kylin/rest/service/SparkSourceService.java | 10 +++++-- .../kylin/rest/service/SparkSourceServiceTest.java | 35 ++++++++++++++++++---- .../spark/source/NSparkMetadataExplorer.java | 16 +++++----- .../spark/source/NSparkTableMetaExplorer.java | 12 +++++--- .../scala/org/apache/spark/sql/DdlOperation.scala | 17 ++++++----- 6 files changed, 64 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 826f9c96e1..a53b5cf2a1 100644 --- a/pom.xml +++ b/pom.xml @@ -123,8 +123,8 @@ <kafka.version>2.8.2</kafka.version> <!-- Spark versions --> - <delta.version>1.2.1</delta.version> - <spark.version>3.2.0-kylin-4.6.8.0</spark.version> + <delta.version>2.0.2</delta.version> + <spark.version>3.2.0-kylin-4.6.8.0-SNAPSHOT</spark.version> <roaring.version>0.9.2-kylin-r4</roaring.version> diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java index 52d23b6c21..fbf3a1387e 100644 --- a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java +++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkSourceService.java @@ -35,6 +35,9 @@ import org.apache.kylin.common.exception.ServerErrorCode; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.StringHelper; +import org.apache.kylin.guava30.shaded.common.base.Strings; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.rest.request.DDLRequest; @@ -54,14 +57,12 @@ import org.apache.spark.sql.catalog.Database; import org.apache.spark.sql.catalog.Table; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.delta.DeltaTableUtils; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.springframework.stereotype.Service; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.guava30.shaded.common.base.Strings; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; import lombok.Data; import lombok.val; @@ -165,6 +166,9 @@ public class SparkSourceService extends BasicService { CatalogTable catalogTable = sparkSession.sessionState().catalog() .getTempViewOrPermanentTableMetadata(new TableIdentifier(table, Option.apply(db))); scala.collection.immutable.List<StructField> structFieldList = catalogTable.schema().toList(); + if (DeltaTableUtils.isDeltaTable(catalogTable)) { + structFieldList = sparkSession.table(catalogTable.identifier()).schema().toList(); + } Iterator<StructField> structFieldIterator = structFieldList.iterator(); List<ColumnModel> columnModels = Lists.newArrayList(); diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java index a2232feff1..c90ee0f8da 100644 --- a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java +++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkSourceServiceTest.java @@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.request.DDLRequest; @@ -42,6 +43,8 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.execution.command.DDLUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,7 +52,7 @@ import org.junit.Test; import org.mockito.InjectMocks; import org.mockito.Mockito; -import org.apache.kylin.guava30.shaded.common.collect.Maps; +import scala.Option; public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { @@ -62,7 +65,10 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { @Before public void setUp() throws Exception { createTestMetadata(); - ss = SparkSession.builder().appName("local").master("local[1]").enableHiveSupport().getOrCreate(); + ss = SparkSession.builder().appName("local").master("local[1]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .enableHiveSupport().getOrCreate(); ss.sparkContext().hadoopConfiguration().set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:db;create=true"); SparderEnv.setSparkSession(ss); @@ -200,11 +206,14 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { @Test public void testListColumns() { Assert.assertEquals(4, sparkSourceService.listColumns("default", "COUNTRY").size()); + sparkSourceService.executeSQL( + "CREATE EXTERNAL TABLE delta_bigints_2(id bigint,str string) USING DELTA LOCATION '/tmp/delta_data_spark_2'"); + Assert.assertEquals(2, sparkSourceService.listColumns("default", "delta_bigints_2").size()); } @Test - public void testExportTables() { + public void testExportTables() throws Exception { // hive data source String expectedTableStructure = "CREATE EXTERNAL TABLE `default`.`hive_bigints`( `id` BIGINT) " + "ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' " @@ -218,22 +227,38 @@ public class SparkSourceServiceTest extends NLocalFileMetadataTestCase { .getTables().get("hive_bigints"); Assert.assertEquals(actureTableStructure.substring(0, actureTableStructure.lastIndexOf("TBLPROPERTIES")), expectedTableStructure); - Assert.assertTrue(DdlOperation.isHiveTable("default", "hive_bigints")); + CatalogTable tableMetadata = SparderEnv.getSparkSession().sessionState().catalog() + .getTableRawMetadata(new TableIdentifier("hive_bigints", Option.apply("default"))); + Assert.assertTrue(DDLUtils.isHiveTable(tableMetadata)); // spark datasource sparkSourceService.executeSQL( "CREATE EXTERNAL TABLE spark_bigints(id bigint) USING PARQUET LOCATION '/tmp/parquet_data_spark'"); - Assert.assertFalse(DdlOperation.isHiveTable("default", "spark_bigints")); + tableMetadata = SparderEnv.getSparkSession().sessionState().catalog() + .getTableRawMetadata(new TableIdentifier("spark_bigints", Option.apply("default"))); + Assert.assertFalse(DDLUtils.isHiveTable(tableMetadata)); String sparkDDL = sparkSourceService.exportTables("default", new String[] { "spark_bigints" }).getTables() .get("spark_bigints"); Assert.assertFalse(sparkDDL.isEmpty()); Assert.assertTrue(StringUtils.containsIgnoreCase(sparkDDL, "USING PARQUET")); + // delta datasource + sparkSourceService.executeSQL( + "CREATE EXTERNAL TABLE delta_bigints(id bigint) USING DELTA LOCATION '/tmp/delta_data_spark'"); + tableMetadata = SparderEnv.getSparkSession().sessionState().catalog() + .getTableRawMetadata(new TableIdentifier("delta_bigints", Option.apply("default"))); + Assert.assertFalse(DDLUtils.isHiveTable(tableMetadata)); + String deltaDDL = sparkSourceService.exportTables("default", new String[] { "delta_bigints" }).getTables() + .get("delta_bigints"); + Assert.assertFalse(deltaDDL.isEmpty()); + Assert.assertTrue(StringUtils.containsIgnoreCase(deltaDDL, "USING DELTA")); + // view sparkSourceService.executeSQL("CREATE VIEW view_bigints as select id from default.spark_bigints"); String viewDDL = DdlOperation.collectDDL(TableIdentifier.apply("view_bigints"), "show create view default.view_bigints"); Assert.assertFalse(StringUtils.isEmpty(viewDDL)); + } @Test diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java index 20ab0c264c..31dcc1e5c5 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java @@ -41,6 +41,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.NTableMetadataManager; @@ -48,7 +49,6 @@ import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISourceMetadataExplorer; - import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -57,12 +57,10 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Database; import org.apache.spark.sql.catalyst.catalog.CatalogTableType; import org.apache.spark.sql.internal.SQLConf; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.clearspring.analytics.util.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; import lombok.val; @@ -102,14 +100,14 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD @Override public List<String> listDatabases() throws Exception { Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace"); - List<String> databases = - dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList()); + List<String> databases = dataset.collectAsList().stream().map(row -> row.getString(0)) + .collect(Collectors.toList()); if (KylinConfig.getInstanceFromEnv().isDDLLogicalViewEnabled()) { String logicalViewDB = KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB(); databases.forEach(db -> { if (db.equalsIgnoreCase(logicalViewDB)) { - throw new KylinException(DDL_CHECK_ERROR, "Logical view database should not be duplicated " - + "with normal hive database!!!"); + throw new KylinException(DDL_CHECK_ERROR, + "Logical view database should not be duplicated " + "with normal hive database!!!"); } }); List<String> databasesWithLogicalDB = Lists.newArrayList(); @@ -201,7 +199,7 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD isAccess = false; try { logger.error("Read hive database {} error:{}, ugi name: {}.", database, e.getMessage(), - UserGroupInformation.getCurrentUser().getUserName()); + UserGroupInformation.getCurrentUser().getUserName()); } catch (IOException ex) { logger.error("fetch user curr ugi info error.", e); } @@ -318,7 +316,7 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD public boolean checkDatabaseAccess(String database) throws Exception { boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable(); String viewDB = KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB(); - if(viewDB.equalsIgnoreCase(database)){ + if (viewDB.equalsIgnoreCase(database)) { return true; } if (hiveDBAccessFilterEnable) { diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java index 7f25bfde1b..332b2c20b3 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java @@ -28,18 +28,18 @@ import java.util.stream.Collectors; import org.apache.commons.jnet.Installer; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.apache.spark.sql.catalyst.catalog.CatalogTableType; import org.apache.spark.sql.catalyst.catalog.SessionCatalog; +import org.apache.spark.sql.delta.DeltaTableUtils; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import scala.Option; import scala.collection.JavaConversions; @@ -111,7 +111,11 @@ public class NSparkTableMetaExplorer implements Serializable { private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable tableMetadata) { NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder(); builder.setTableName(tableName); - builder.setAllColumns(getColumns(tableMetadata, tableMetadata.schema())); + StructType allColSchema = tableMetadata.schema(); + if (DeltaTableUtils.isDeltaTable(tableMetadata)) { + allColSchema = SparderEnv.getSparkSession().table(tableMetadata.identifier()).schema(); + } + builder.setAllColumns(getColumns(tableMetadata, allColSchema)); builder.setOwner(tableMetadata.owner()); builder.setCreateTime(tableMetadata.createTime() + ""); builder.setLastAccessTime(tableMetadata.lastAccessTime() + ""); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala index f1db6006b1..125407dd10 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/DdlOperation.scala @@ -26,9 +26,11 @@ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdenti import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.{CommandExecutionMode, CommandResultExec, QueryExecution, SparkPlan} import org.apache.spark.sql.types.StructField - import java.lang.{String => JString} import java.util.{List => JList} + +import org.apache.spark.sql.delta.DeltaTableUtils + import scala.collection.JavaConverters._ @@ -108,7 +110,13 @@ object DdlOperation extends Logging { def getTableDesc(database: String, table: String): String = { var sql = s"SHOW CREATE TABLE $database.$table" - sql = if (isHiveTable(database, table)) sql + " AS SERDE" else sql + var tableMetadata = SparderEnv.getSparkSession.sessionState.catalog + .getTableRawMetadata(TableIdentifier(table, Some(database))) + if (DeltaTableUtils.isDeltaTable(tableMetadata)) { + return new ShowCreateTableCommand(TableIdentifier(table, Some(database)), Seq.empty). + run(SparderEnv.getSparkSession).toList.take(1).head.getString(0); + } + sql = if (DDLUtils.isHiveTable(tableMetadata)) sql + " AS SERDE" else sql val logicalPlan = SparderEnv.getSparkSession.sessionState.sqlParser.parsePlan(sql) val queryExecution: QueryExecution = SparderEnv.getSparkSession.sessionState.executePlan(logicalPlan, CommandExecutionMode.SKIP) @@ -118,11 +126,6 @@ object DdlOperation extends Logging { } } - def isHiveTable(database: String, table: String): Boolean = { - val tableMetadata = SparderEnv.getSparkSession.sessionState.catalog - .getTableRawMetadata(TableIdentifier(table, Some(database))) - !DDLUtils.isDatasourceTable(tableMetadata) - } def collectDDL(tableIdentifier: TableIdentifier, sql: String): String = { val catalog: SessionCatalog = SparderEnv.getSparkSession.sessionState.catalog