This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6cba5e63396e1e2e56703b4c334e7a4df7a23dda Author: fanshu.kong <1714585...@qq.com> AuthorDate: Tue Sep 27 17:09:08 2022 +0800 KYLIN-5322 fix select count when out of segment range --- .../engine/exec/sparder/SparderQueryPlanExec.java | 10 ++--- .../kylin/query/runtime/plan/TableScanPlan.scala | 43 ++++++++++++++----- .../query/runtime/plan/SegmentEmptyTest.scala | 50 ++++++++++++++++++++++ .../org/apache/spark/sql/SparderTypeUtil.scala | 13 +++--- 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java index a9a9963ae4..bbe7d25192 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java @@ -32,8 +32,6 @@ import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; import org.apache.kylin.common.msg.MsgPicker; -import org.apache.kylin.query.relnode.OLAPContext; -import org.apache.kylin.query.relnode.OLAPRel; import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.query.engine.exec.ExecuteResult; @@ -43,6 +41,8 @@ import org.apache.kylin.query.engine.meta.SimpleDataContext; import org.apache.kylin.query.relnode.ContextUtil; import org.apache.kylin.query.relnode.KapContext; import org.apache.kylin.query.relnode.KapRel; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.relnode.OLAPRel; import org.apache.kylin.query.runtime.SparkEngine; import org.apache.kylin.query.util.QueryContextCutter; import org.apache.spark.SparkException; @@ -77,7 +77,7 @@ public class SparderQueryPlanExec implements QueryPlanExec { || KapConfig.wrap(((SimpleDataContext) dataContext).getKylinConfig()).runConstantQueryLocally()) { val contexts = ContextUtil.listContexts(); for (OLAPContext context : contexts) { - if (context.olapSchema != null && context.storageContext.isEmptyLayout()) { + if (context.olapSchema != null && context.storageContext.isEmptyLayout() && !context.isHasAgg()) { QueryContext.fillEmptyResultSetMetrics(); return new ExecuteResult(Lists.newArrayList(), 0); } @@ -134,7 +134,7 @@ public class SparderQueryPlanExec implements QueryPlanExec { QueryContext.current().getSecondStorageUsageMap().clear(); } else if (e instanceof SQLException) { handleForceToTieredStorage(e); - }else { + } else { return ExceptionUtils.rethrow(e); } } @@ -186,7 +186,7 @@ public class SparderQueryPlanExec implements QueryPlanExec { } private void handleForceToTieredStorage(final Exception e) { - if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)){ + if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)) { ForceToTieredStorage forcedToTieredStorage = QueryContext.current().getForcedToTieredStorage(); boolean forceTableIndex = QueryContext.current().isForceTableIndex(); QueryContext.current().setLastFailed(true); diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala index a10d0a16f3..5c79e0f113 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala @@ -25,23 +25,21 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow} import org.apache.kylin.metadata.cube.realization.HybridRealization -import org.apache.kylin.metadata.model.NTableMetadataManager -import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil} import org.apache.kylin.metadata.model._ +import org.apache.kylin.metadata.realization.IRealization import org.apache.kylin.metadata.tuple.TupleInfo +import org.apache.kylin.query.implicits.sessionToQueryContext +import org.apache.kylin.query.relnode.{KapRel, OLAPContext} +import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil} import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.functions.col import org.apache.spark.sql.manager.SparderLookupManager import org.apache.spark.sql.types.{ArrayType, DoubleType, StructField, StructType} import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.spark.sql.{DataFrame, _} +import org.apache.spark.sql._ import java.util.concurrent.ConcurrentHashMap import java.{lang, util} -import org.apache.kylin.metadata.realization.IRealization -import org.apache.kylin.query.implicits.sessionToQueryContext -import org.apache.kylin.query.relnode.{KapRel, OLAPContext} - import scala.collection.JavaConverters._ // scalastyle:off @@ -71,7 +69,8 @@ object TableScanPlan extends LogEx { val realizations = olapContext.realization.getRealizations.asScala.toList realizations.map(_.asInstanceOf[NDataflow]) .filter(dataflow => (!dataflow.isStreaming && !context.isBatchCandidateEmpty) || - (dataflow.isStreaming && !context.isStreamCandidateEmpty)) + (dataflow.isStreaming && !context.isStreamCandidateEmpty) || + isSegmentsEmpty(prunedSegments, prunedStreamingSegments)) .map(dataflow => { if (dataflow.isStreaming) { tableScan(rel, dataflow, olapContext, session, prunedStreamingSegments, context.getStreamingCandidate) @@ -81,13 +80,33 @@ object TableScanPlan extends LogEx { }).reduce(_.union(_)) } + // prunedSegments is null + def tableScanEmptySegment(rel: KapRel): DataFrame = { + logInfo("prunedSegments is null") + val df = SparkOperation.createEmptyDataFrame( + StructType( + rel.getColumnRowType.getAllColumns.asScala + .map(column => + StructField(column.toString.replaceAll("\\.", "_"), SparderTypeUtil.toSparkType(column.getType))))) + val cols = df.schema.map(structField => { + col(structField.name) + }) + df.select(cols: _*) + } + + def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], prunedStreamingSegments: util.List[NDataSegment]): Boolean = { + val isPrunedSegmentsEmpty = prunedSegments == null || prunedSegments.size() == 0 + val isPrunedStreamingSegmentsEmpty = prunedStreamingSegments == null || prunedStreamingSegments.size() == 0 + isPrunedSegmentsEmpty && isPrunedStreamingSegmentsEmpty + } + def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext, session: SparkSession, prunedSegments: util.List[NDataSegment], candidate: NLayoutCandidate): DataFrame = { val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions olapContext.resetSQLDigest() //TODO: refactor val cuboidLayout = candidate.getLayoutEntity - if (cuboidLayout.getIndex.isTableIndex) { + if (cuboidLayout.getIndex != null && cuboidLayout.getIndex.isTableIndex) { QueryContext.current().getQueryTagInfo.setTableIndex(true) } val tableName = olapContext.firstTableScan.getBackupAlias @@ -97,6 +116,9 @@ object TableScanPlan extends LogEx { ///////////////////////////////////////////// val kapConfig = KapConfig.wrap(dataflow.getConfig) val basePath = kapConfig.getReadParquetStoragePath(dataflow.getProject) + if (prunedSegments == null || prunedSegments.size() == 0) { + return tableScanEmptySegment(rel: KapRel) + } val fileList = prunedSegments.asScala.map( seg => toLayoutPath(dataflow, cuboidLayout.getId, basePath, seg, prunedPartitionMap) ) @@ -366,8 +388,7 @@ object TableScanPlan extends LogEx { val session = SparderEnv.getSparkSession val olapContext = rel.getContext var instance: IRealization = null - if (olapContext.realization.isInstanceOf[NDataflow]) - { + if (olapContext.realization.isInstanceOf[NDataflow]) { instance = olapContext.realization.asInstanceOf[NDataflow] } else { instance = olapContext.realization.asInstanceOf[HybridRealization] diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala new file mode 100644 index 0000000000..03e0577013 --- /dev/null +++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.runtime.plan + +import org.apache.kylin.metadata.cube.model.NDataSegment +import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.junit.Assert + +import java.util + +class SegmentEmptyTest extends SparderBaseFunSuite with SharedSparkSession with LocalMetadata { + + val prunedSegment1 = null + val prunedSegment2 = new util.LinkedList[NDataSegment] + val prunedSegment3 = new util.LinkedList[NDataSegment] + prunedSegment3.add(new NDataSegment()) + + val prunedStreamingSegment1 = null + val prunedStreamingSegment2 = new util.LinkedList[NDataSegment] + val prunedStreamingSegment3 = new util.LinkedList[NDataSegment] + prunedStreamingSegment3.add(new NDataSegment()) + + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment1)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment3)) + + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment1)) + Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment3)) + + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment1)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment2)) + Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment3)) +} diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala index 2a2d1d02ab..6791718885 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala @@ -18,11 +18,6 @@ package org.apache.spark.sql.util -import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} -import java.math.BigDecimal -import java.sql.{Date, Timestamp, Types} -import java.time.ZoneId -import java.util.{GregorianCalendar, Locale, TimeZone} import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexLiteral @@ -33,7 +28,7 @@ import org.apache.kylin.common.util.DateFormat import org.apache.kylin.metadata.datatype.DataType import org.apache.spark.internal.Logging import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.expressions.{Base64, Cast} +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ @@ -41,6 +36,11 @@ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.springframework.util.Base64Utils +import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} +import java.math.BigDecimal +import java.sql.{Date, Timestamp, Types} +import java.time.ZoneId +import java.util.{GregorianCalendar, Locale, TimeZone} import scala.collection.{immutable, mutable} object SparderTypeUtil extends Logging { @@ -117,6 +117,7 @@ object SparderTypeUtil extends Logging { case tp if tp.startsWith("extendedcolumn") => BinaryType case tp if tp.startsWith("percentile") => BinaryType case tp if tp.startsWith("raw") => BinaryType + case "any" => StringType case _ => throw new IllegalArgumentException(dataTp.toString) } }