This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7ddcd87169d4e5abeb196fbdcc8fce28a5f3466a Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com> AuthorDate: Tue Mar 21 16:46:58 2023 +0800 KYLIN-5574 To fix the error of building models when KE's meatadata is inconstent with Hive metastore's. --- .../kylin/engine/spark/mockup/CsvSource.java | 4 +- .../spark/source/NSparkCubingSourceInput.java | 94 +++++++++++++++------- .../kylin/engine/spark/job/TableAnalysisJob.scala | 23 ++++++ .../job/stage/build/FlatTableAndDictBase.scala | 14 +--- .../org/apache/spark/application/JobMonitor.scala | 7 +- .../engine/spark/job/NTableSamplingJobTest.java | 60 ++++++++++++++ 6 files changed, 159 insertions(+), 43 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java index 0932d8b485..fefa6c6bed 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/mockup/CsvSource.java @@ -73,8 +73,8 @@ public class CsvSource implements ISource { @Override public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> parameters) { - if (KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB() - .equalsIgnoreCase(table.getDatabase())) { + if ("null".equalsIgnoreCase(table.getDatabase()) + || KylinConfig.getInstanceFromEnv().getDDLLogicalViewDB().equalsIgnoreCase(table.getDatabase())) { return new NSparkCubingSourceInput().getSourceData(table, ss, parameters); } String path = new File(getUtMetaDir(), "data/" + table.getIdentity() + ".csv").getAbsolutePath(); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java index 43358a93d1..fe31888d74 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java @@ -17,18 +17,12 @@ */ package org.apache.kylin.engine.spark.source; -import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable; - -import java.util.List; -import java.util.Locale; -import java.util.Map; - import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.engine.spark.job.KylinBuildEnv; import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -37,30 +31,62 @@ import org.apache.spark.sql.util.SparderTypeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.base.Joiner; -import org.apache.kylin.guava30.shaded.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.calcite.avatica.util.Quoting.BACK_TICK; +import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable; public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingSource { + private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class); @Override public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) { - ColumnDesc[] columnDescs = table.getColumns(); - List<String> tblColNames = Lists.newArrayListWithCapacity(columnDescs.length); - StructType kylinSchema = new StructType(); - for (ColumnDesc columnDesc : columnDescs) { - if (!columnDesc.isComputedColumn()) { - kylinSchema = kylinSchema.add(columnDesc.getName(), - SparderTypeUtil.toSparkType(columnDesc.getType(), false), true); - tblColNames.add("`" + columnDesc.getName() + "`"); - } - } - String[] colNames = tblColNames.toArray(new String[0]); - String colString = Joiner.on(",").join(colNames); - String sql; KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig(); logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}", table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled()); + + // Extract effective columns which column exists in both kylin and source table + List<ColumnDesc> effectiveColumns = extractEffectiveColumns(table, ss); + String sql = generateSelectSql(table, effectiveColumns, params, kylinConfig); + StructType kylinSchema = generateKylinSchema(effectiveColumns); + if (logger.isDebugEnabled()) { + logger.debug("Source data sql is: {}", sql); + logger.debug("Kylin schema: {}", kylinSchema.treeString()); + } + Dataset<Row> df = ss.sql(sql); + StructType sparkSchema = df.schema(); + + // Caution: sparkSchema and kylinSchema should keep same column order before aligning + return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema)); + } + + private List<ColumnDesc> extractEffectiveColumns(TableDesc table, SparkSession ss) { + List<ColumnDesc> ret = new ArrayList<>(); + Dataset<Row> sourceTableDS = ss.table(table.getTableAlias()); + Set<String> sourceTableColumns = Arrays.stream(sourceTableDS.columns()).map(String::toUpperCase) + .collect(Collectors.toSet()); + for (ColumnDesc col : table.getColumns()) { + if (!col.isComputedColumn()) { + if (sourceTableColumns.contains(col.getName())) { + ret.add(col); + } else { + logger.warn("Table {} missing column {} in source schema", table.getTableAlias(), col.getName()); + } + } + } + return ret; + } + + private String generateSelectSql(TableDesc table, List<ColumnDesc> effectiveColumns, Map<String, String> params, KylinConfig kylinConfig) { + String colString = generateColString(effectiveColumns); + String sql; if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled())) { sql = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString, @@ -68,10 +94,22 @@ public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingS } else { sql = String.format(Locale.ROOT, "select %s from %s", colString, table.getBackTickIdentity()); } - Dataset<Row> df = ss.sql(sql); - StructType sparkSchema = df.schema(); - logger.debug("Source data sql is: {}", sql); - logger.debug("Kylin schema: {}", kylinSchema.treeString()); - return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema)); + return sql; + } + + private String generateColString(List<ColumnDesc> effectiveColumns) { + return effectiveColumns.stream().map(col -> BACK_TICK.string + col.getName() + BACK_TICK.string) + .collect(Collectors.joining(",")); + } + + private StructType generateKylinSchema(List<ColumnDesc> effectiveColumns) { + StructType kylinSchema = new StructType(); + for (ColumnDesc columnDesc : effectiveColumns) { + if (!columnDesc.isComputedColumn()) { + kylinSchema = kylinSchema.add(columnDesc.getName(), + SparderTypeUtil.toSparkType(columnDesc.getType(), false), true); + } + } + return kylinSchema; } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala index 6bdfe5ddcb..e091bb5b85 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/TableAnalysisJob.scala @@ -27,6 +27,7 @@ import org.apache.kylin.engine.spark.utils.SparkConfHelper import org.apache.kylin.metadata.model.TableDesc import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.source.SourceFactory +import org.apache.spark.application.NoRetryException import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrameEnhancement._ import org.apache.spark.sql._ @@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.functions._ import scala.collection.JavaConverters._ +import scala.collection.mutable class TableAnalysisJob(tableDesc: TableDesc, project: String, @@ -60,6 +62,10 @@ class TableAnalysisJob(tableDesc: TableDesc, .getSourceData(tableDesc, ss, params) .coalesce(numPartitions) + if (!checkColumns(tableDesc, dataFrame)) { + throw new NoRetryException("Source table missing columns. Please reload table before sampling.") + } + calculateViewMetasIfNeeded(tableDesc.getBackTickIdentity) val dat = dataFrame.localLimit(rowsTakenInEachPartition) @@ -114,4 +120,21 @@ class TableAnalysisJob(tableDesc: TableDesc, ).toList } + def checkColumns(tableDesc: TableDesc, ds: Dataset[Row]): Boolean = { + logInfo(s"Check columns for table ${tableDesc.getIdentity}") + val sourceColumnSet: Set[String] = ds.columns.map(col => col.toUpperCase).toSet + val nonExistColumns: mutable.Set[String] = mutable.Set() + for (col <- tableDesc.getColumns) { + if (!col.isComputedColumn && !sourceColumnSet.contains(col.getName)) { + nonExistColumns.add(col.getName) + } + } + if (nonExistColumns.nonEmpty) { + logError(s"Check columns for table ${tableDesc.getIdentity} failed, missing following columns: ${nonExistColumns}") + } else { + logInfo(s"Check columns for table ${tableDesc.getIdentity} good") + } + nonExistColumns.isEmpty + } + } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index eaff79e101..3117741c9e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -18,9 +18,6 @@ package org.apache.kylin.engine.spark.job.stage.build -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Locale, Objects, Timer, TimerTask} - import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.kylin.common.util.HadoopUtil @@ -33,10 +30,11 @@ import org.apache.kylin.engine.spark.job.NSparkCubingUtil.convertFromDot import org.apache.kylin.engine.spark.job.stage.{BuildParam, StageExec} import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob, TableMetaManager} import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.engine.spark.smarter.IndexDependencyParser import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, FlatTableToCostUtils} +import org.apache.kylin.engine.spark.smarter.IndexDependencyParser import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.engine.spark.utils.SparkDataSource._ +import org.apache.kylin.guava30.shaded.common.collect.Sets import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment @@ -50,12 +48,6 @@ import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.utils.ProxyThreadUtils import java.math.BigInteger -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Locale, Objects, Timer, TimerTask} - -import org.apache.kylin.common.constant.LogConstant -import org.apache.kylin.common.logging.SetLogCategory - import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.{Locale, Objects, Timer, TimerTask} import scala.collection.JavaConverters._ @@ -65,8 +57,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS} import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} -import org.apache.kylin.guava30.shaded.common.collect.Sets - abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, private val dataSegment: NDataSegment, private val buildParam: BuildParam) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala index 98a5d7e5da..35b1c9f6d0 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobMonitor.scala @@ -21,6 +21,7 @@ package org.apache.spark.application import org.apache.kylin.engine.spark.job.KylinBuildEnv import org.apache.kylin.engine.spark.scheduler._ import io.netty.util.internal.ThrowableUtil +import org.apache.commons.lang3.StringUtils import org.apache.kylin.common.util.Unsafe import org.apache.spark.autoheal.ExceptionTerminator import org.apache.spark.internal.Logging @@ -80,7 +81,11 @@ class JobMonitor(eventLoop: KylinJobEventLoop) extends Logging { } def handleUnknownThrowable(ur: UnknownThrowable): Unit = { - eventLoop.post(JobFailed("Unknown error occurred during the job.", ur.throwable)) + var msg = "Unknown error occurred during the job." + if (ur.throwable != null && StringUtils.isNotBlank(ur.throwable.getMessage)) { + msg = ur.throwable.getMessage + } + eventLoop.post(JobFailed(msg, ur.throwable)) } } diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java index a041825fe4..7e058d08f8 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NTableSamplingJobTest.java @@ -23,13 +23,19 @@ import lombok.var; import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.job.dao.JobStatisticsManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.source.SourceFactory; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparderEnv; import org.junit.After; import org.junit.Assert; @@ -38,6 +44,8 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; @@ -105,6 +113,58 @@ public class NTableSamplingJobTest extends NLocalWithSparkSessionTest { Assert.assertEquals(samplingJob.getCreateTime(), tableExt.getCreateTime()); } + @Test + public void testTableSamplingJobFailed_withCheckColumnsError() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + final String TABLE_SAMPLING_TEST_TMP_DB = "TABLE_SAMPLING_TEST_TMP_DB"; + overwriteSystemProp("kylin.source.ddl.logical-view.database", TABLE_SAMPLING_TEST_TMP_DB); + + final String tableName = "DEFAULT.TEST_KYLIN_FACT"; + NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(config, PROJECT); + final TableDesc tableDesc = tableMgr.getTableDesc(tableName); + Assert.assertNotNull(tableDesc); + + ColumnDesc[] columns = tableDesc.getColumns(); + ColumnDesc[] columnsModified = Arrays.copyOfRange(columns, 0, columns.length + 2); + columnsModified[columnsModified.length - 2] = new ColumnDesc( + "13", + "A_CC_COL", + "boolean", + "", + "true|false|TRUE|FALSE|True|False", + null, + "non-empty expr"); + columnsModified[columnsModified.length - 1] = new ColumnDesc( + "14", + "A_NON_EXIST_COL", + "boolean", + "", + "true|false|TRUE|FALSE|True|False", + null, + null); + TableDesc tableDescModified = tableMgr.copyForWrite(tableDesc); + tableDescModified.setDatabase(null); + tableDescModified.setColumns(columnsModified); + tableDescModified.setMvcc(-1L); + tableMgr.saveSourceTable(tableDescModified); + + Map<String, String> params = NProjectManager.getInstance(config) + .getProject(PROJECT).getLegalOverrideKylinProps(); + Dataset<Row> dataFrame = SourceFactory + .createEngineAdapter(tableDesc, NSparkCubingEngine.NSparkCubingSource.class) + .getSourceData(tableDesc, ss, params) + .coalesce(1); + dataFrame.createOrReplaceTempView(tableDescModified.getName()); + + NExecutableManager execMgr = NExecutableManager.getInstance(config, PROJECT); + val samplingJob = NTableSamplingJob.create(tableDescModified, PROJECT, "ADMIN", 20_000_000); + execMgr.addJob(samplingJob); + Assert.assertEquals(ExecutableState.READY, samplingJob.getStatus()); + + await().atMost(60000, TimeUnit.MINUTES).until(() -> !execMgr.getJob(samplingJob.getId()).getStatus().isProgressing()); + Assert.assertEquals(ExecutableState.ERROR, samplingJob.getStatus()); + } + @Test public void testTableSamplingJobWithS3Role() { getTestConfig().setProperty("kylin.env.use-dynamic-S3-role-credential-in-table", "true");