This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7ddcd87169d4e5abeb196fbdcc8fce28a5f3466a
Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com>
AuthorDate: Tue Mar 21 16:46:58 2023 +0800

    KYLIN-5574 To fix the error of building models when KE's meatadata is 
inconstent with Hive metastore's.
---
 .../kylin/engine/spark/mockup/CsvSource.java       |  4 +-
 .../spark/source/NSparkCubingSourceInput.java      | 94 +++++++++++++++-------
 .../kylin/engine/spark/job/TableAnalysisJob.scala  | 23 ++++++
 .../job/stage/build/FlatTableAndDictBase.scala     | 14 +---
 .../org/apache/spark/application/JobMonitor.scala  |  7 +-
 .../engine/spark/job/NTableSamplingJobTest.java    | 60 ++++++++++++++
 6 files changed, 159 insertions(+), 43 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java
index 0932d8b485..fefa6c6bed 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java
@@ -73,8 +73,8 @@ public class CsvSource implements ISource {
 
                 @Override
                 public Dataset<Row> getSourceData(TableDesc table, 
SparkSession ss, Map<String, String> parameters) {
-                    if (KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB()
-                        .equalsIgnoreCase(table.getDatabase())) {
+                    if ("null".equalsIgnoreCase(table.getDatabase())
+                            || 
KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB().equalsIgnoreCase(table.getDatabase()))
 {
                       return new 
NSparkCubingSourceInput().getSourceData(table, ss, parameters);
                     }
                     String path = new File(getUtMetaDir(), "data/" + 
table.getIdentity() + ".csv").getAbsolutePath();
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
index 43358a93d1..fe31888d74 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
@@ -17,18 +17,12 @@
  */
 package org.apache.kylin.engine.spark.source;
 
-import static 
org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
 import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -37,30 +31,62 @@ import org.apache.spark.sql.util.SparderTypeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.base.Joiner;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.avatica.util.Quoting.BACK_TICK;
+import static 
org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
 
 public class NSparkCubingSourceInput implements 
NSparkCubingEngine.NSparkCubingSource {
+
     private static final Logger logger = 
LoggerFactory.getLogger(NSparkCubingSourceInput.class);
 
     @Override
     public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, 
Map<String, String> params) {
-        ColumnDesc[] columnDescs = table.getColumns();
-        List<String> tblColNames = 
Lists.newArrayListWithCapacity(columnDescs.length);
-        StructType kylinSchema = new StructType();
-        for (ColumnDesc columnDesc : columnDescs) {
-            if (!columnDesc.isComputedColumn()) {
-                kylinSchema = kylinSchema.add(columnDesc.getName(),
-                        SparderTypeUtil.toSparkType(columnDesc.getType(), 
false), true);
-                tblColNames.add("`" + columnDesc.getName() + "`");
-            }
-        }
-        String[] colNames = tblColNames.toArray(new String[0]);
-        String colString = Joiner.on(",").join(colNames);
-        String sql;
         KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig();
         
logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}",
                 table.isRangePartition(), table.isTransactional(), 
kylinConfig.isReadTransactionalTableEnabled());
+
+        // Extract effective columns which column exists in both kylin and 
source table
+        List<ColumnDesc> effectiveColumns = extractEffectiveColumns(table, ss);
+        String sql = generateSelectSql(table, effectiveColumns, params, 
kylinConfig);
+        StructType kylinSchema = generateKylinSchema(effectiveColumns);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Source data sql is: {}", sql);
+            logger.debug("Kylin schema: {}", kylinSchema.treeString());
+        }
+        Dataset<Row> df = ss.sql(sql);
+        StructType sparkSchema = df.schema();
+
+        // Caution: sparkSchema and kylinSchema should keep same column order 
before aligning
+        return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, 
kylinSchema));
+    }
+
+    private List<ColumnDesc> extractEffectiveColumns(TableDesc table, 
SparkSession ss) {
+        List<ColumnDesc> ret = new ArrayList<>();
+        Dataset<Row> sourceTableDS = ss.table(table.getTableAlias());
+        Set<String> sourceTableColumns = 
Arrays.stream(sourceTableDS.columns()).map(String::toUpperCase)
+                .collect(Collectors.toSet());
+        for (ColumnDesc col : table.getColumns()) {
+            if (!col.isComputedColumn()) {
+                if (sourceTableColumns.contains(col.getName())) {
+                    ret.add(col);
+                } else {
+                    logger.warn("Table {} missing column {} in source schema", 
table.getTableAlias(), col.getName());
+                }
+            }
+        }
+        return ret;
+    }
+
+    private String generateSelectSql(TableDesc table, List<ColumnDesc> 
effectiveColumns, Map<String, String> params, KylinConfig kylinConfig) {
+        String colString = generateColString(effectiveColumns);
+        String sql;
         if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), 
table.isTransactional(),
                 kylinConfig.isReadTransactionalTableEnabled())) {
             sql = 
HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, 
colString,
@@ -68,10 +94,22 @@ public class NSparkCubingSourceInput implements 
NSparkCubingEngine.NSparkCubingS
         } else {
             sql = String.format(Locale.ROOT, "select %s from %s", colString, 
table.getBackTickIdentity());
         }
-        Dataset<Row> df = ss.sql(sql);
-        StructType sparkSchema = df.schema();
-        logger.debug("Source data sql is: {}", sql);
-        logger.debug("Kylin schema: {}", kylinSchema.treeString());
-        return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, 
kylinSchema));
+        return sql;
+    }
+
+    private String generateColString(List<ColumnDesc> effectiveColumns) {
+        return effectiveColumns.stream().map(col -> BACK_TICK.string + 
col.getName() + BACK_TICK.string)
+                .collect(Collectors.joining(","));
+    }
+
+    private StructType generateKylinSchema(List<ColumnDesc> effectiveColumns) {
+        StructType kylinSchema = new StructType();
+        for (ColumnDesc columnDesc : effectiveColumns) {
+            if (!columnDesc.isComputedColumn()) {
+                kylinSchema = kylinSchema.add(columnDesc.getName(),
+                        SparderTypeUtil.toSparkType(columnDesc.getType(), 
false), true);
+            }
+        }
+        return kylinSchema;
     }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
index 6bdfe5ddcb..e091bb5b85 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala
@@ -27,6 +27,7 @@ import org.apache.kylin.engine.spark.utils.SparkConfHelper
 import org.apache.kylin.metadata.model.TableDesc
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.source.SourceFactory
+import org.apache.spark.application.NoRetryException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrameEnhancement._
 import org.apache.spark.sql._
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.functions._
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class TableAnalysisJob(tableDesc: TableDesc,
                        project: String,
@@ -60,6 +62,10 @@ class TableAnalysisJob(tableDesc: TableDesc,
       .getSourceData(tableDesc, ss, params)
       .coalesce(numPartitions)
 
+    if (!checkColumns(tableDesc, dataFrame)) {
+      throw new NoRetryException("Source table missing columns. Please reload 
table before sampling.")
+    }
+
     calculateViewMetasIfNeeded(tableDesc.getBackTickIdentity)
 
     val dat = dataFrame.localLimit(rowsTakenInEachPartition)
@@ -114,4 +120,21 @@ class TableAnalysisJob(tableDesc: TableDesc,
     ).toList
   }
 
+  def checkColumns(tableDesc: TableDesc, ds: Dataset[Row]): Boolean = {
+    logInfo(s"Check columns for table ${tableDesc.getIdentity}")
+    val sourceColumnSet: Set[String] = ds.columns.map(col => 
col.toUpperCase).toSet
+    val nonExistColumns: mutable.Set[String] = mutable.Set()
+    for (col <- tableDesc.getColumns) {
+      if (!col.isComputedColumn && !sourceColumnSet.contains(col.getName)) {
+        nonExistColumns.add(col.getName)
+      }
+    }
+    if (nonExistColumns.nonEmpty) {
+      logError(s"Check columns for table ${tableDesc.getIdentity} failed, 
missing following columns: ${nonExistColumns}")
+    } else {
+      logInfo(s"Check columns for table ${tableDesc.getIdentity} good")
+    }
+    nonExistColumns.isEmpty
+  }
+
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index eaff79e101..3117741c9e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -18,9 +18,6 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.util.HadoopUtil
@@ -33,10 +30,11 @@ import 
org.apache.kylin.engine.spark.job.NSparkCubingUtil.convertFromDot
 import org.apache.kylin.engine.spark.job.stage.{BuildParam, StageExec}
 import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob, 
TableMetaManager}
 import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
 import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, 
FlatTableToCostUtils}
+import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
+import org.apache.kylin.guava30.shaded.common.collect.Sets
 import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
 import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
@@ -50,12 +48,6 @@ import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.utils.ProxyThreadUtils
 
 import java.math.BigInteger
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
-
-import org.apache.kylin.common.constant.LogConstant
-import org.apache.kylin.common.logging.SetLogCategory
-
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Locale, Objects, Timer, TimerTask}
 import scala.collection.JavaConverters._
@@ -65,8 +57,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success, Try}
 
-import org.apache.kylin.guava30.shaded.common.collect.Sets
-
 abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
                                     private val dataSegment: NDataSegment,
                                     private val buildParam: BuildParam)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
index 98a5d7e5da..35b1c9f6d0 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala
@@ -21,6 +21,7 @@ package org.apache.spark.application
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
 import org.apache.kylin.engine.spark.scheduler._
 import io.netty.util.internal.ThrowableUtil
+import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.common.util.Unsafe
 import org.apache.spark.autoheal.ExceptionTerminator
 import org.apache.spark.internal.Logging
@@ -80,7 +81,11 @@ class JobMonitor(eventLoop: KylinJobEventLoop) extends 
Logging {
   }
 
   def handleUnknownThrowable(ur: UnknownThrowable): Unit = {
-    eventLoop.post(JobFailed("Unknown error occurred during the job.", 
ur.throwable))
+    var msg = "Unknown error occurred during the job."
+    if (ur.throwable != null && 
StringUtils.isNotBlank(ur.throwable.getMessage)) {
+      msg = ur.throwable.getMessage
+    }
+    eventLoop.post(JobFailed(msg, ur.throwable))
   }
 }
 
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java
index a041825fe4..7e058d08f8 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java
@@ -23,13 +23,19 @@ import lombok.var;
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.job.dao.JobStatisticsManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.source.SourceFactory;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.After;
 import org.junit.Assert;
@@ -38,6 +44,8 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
@@ -105,6 +113,58 @@ public class NTableSamplingJobTest extends 
NLocalWithSparkSessionTest {
         Assert.assertEquals(samplingJob.getCreateTime(), 
tableExt.getCreateTime());
     }
 
+    @Test
+    public void testTableSamplingJobFailed_withCheckColumnsError() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final String TABLE_SAMPLING_TEST_TMP_DB = "TABLE_SAMPLING_TEST_TMP_DB";
+        overwriteSystemProp("kylin.source.ddl.logical-view.database", 
TABLE_SAMPLING_TEST_TMP_DB);
+
+        final String tableName = "DEFAULT.TEST_KYLIN_FACT";
+        NTableMetadataManager tableMgr = 
NTableMetadataManager.getInstance(config, PROJECT);
+        final TableDesc tableDesc = tableMgr.getTableDesc(tableName);
+        Assert.assertNotNull(tableDesc);
+
+        ColumnDesc[] columns = tableDesc.getColumns();
+        ColumnDesc[] columnsModified = Arrays.copyOfRange(columns, 0, 
columns.length + 2);
+        columnsModified[columnsModified.length - 2] = new ColumnDesc(
+                "13",
+                "A_CC_COL",
+                "boolean",
+                "",
+                "true|false|TRUE|FALSE|True|False",
+                null,
+                "non-empty expr");
+        columnsModified[columnsModified.length - 1] = new ColumnDesc(
+                "14",
+                "A_NON_EXIST_COL",
+                "boolean",
+                "",
+                "true|false|TRUE|FALSE|True|False",
+                null,
+                null);
+        TableDesc tableDescModified = tableMgr.copyForWrite(tableDesc);
+        tableDescModified.setDatabase(null);
+        tableDescModified.setColumns(columnsModified);
+        tableDescModified.setMvcc(-1L);
+        tableMgr.saveSourceTable(tableDescModified);
+
+        Map<String, String> params = NProjectManager.getInstance(config)
+                .getProject(PROJECT).getLegalOverrideKylinProps();
+        Dataset<Row> dataFrame = SourceFactory
+                .createEngineAdapter(tableDesc, 
NSparkCubingEngine.NSparkCubingSource.class)
+                .getSourceData(tableDesc, ss, params)
+                .coalesce(1);
+        dataFrame.createOrReplaceTempView(tableDescModified.getName());
+
+        NExecutableManager execMgr = NExecutableManager.getInstance(config, 
PROJECT);
+        val samplingJob = NTableSamplingJob.create(tableDescModified, PROJECT, 
"ADMIN", 20_000_000);
+        execMgr.addJob(samplingJob);
+        Assert.assertEquals(ExecutableState.READY, samplingJob.getStatus());
+
+        await().atMost(60000, TimeUnit.MINUTES).until(() -> 
!execMgr.getJob(samplingJob.getId()).getStatus().isProgressing());
+        Assert.assertEquals(ExecutableState.ERROR, samplingJob.getStatus());
+    }
+
     @Test
     public void testTableSamplingJobWithS3Role() {
         
getTestConfig().setProperty("kylin.env.use-dynamic-S3-role-credential-in-table",
 "true");

Reply via email to