This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 64b52dea660fc5feba34b8650557f1323d55b28b Author: fengguangyuan <qq272101...@gmail.com> AuthorDate: Thu Jul 27 15:46:27 2023 +0800 KYLIN-5771 Query cannot be interrupted during segment pruning Co-authored-by: Guangyuan Feng <guangyuan.f...@kyligence.io> --- .../kylin/query/util/QueryInterruptChecker.java | 53 +++- .../apache/kylin/query/util/SlowQueryDetector.java | 2 +- .../query/routing/PartitionPruningRuleTest.java | 313 +++++++++++++++++++++ .../query/routing/SegmentPruningRuleTest.java | 277 ++++++++++++++++++ .../kylin/query/routing/PartitionPruningRule.java | 15 + .../kylin/query/routing/SegmentPruningRule.java | 33 ++- .../kylin/query/util/QueryContextCutter.java | 5 + 7 files changed, 689 insertions(+), 9 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java index b163cc5a31..df4eb94589 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java @@ -20,6 +20,7 @@ package org.apache.kylin.query.util; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinRuntimeException; import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.query.exception.UserStopQueryException; @@ -32,15 +33,65 @@ public class QueryInterruptChecker { // This is Utils. } + /** + * @deprecated Use {@link this#checkQueryCanceledOrThreadInterrupted(String, String)} instead. + * The semantic of this method is confused in some scenarios. + */ + @Deprecated public static void checkThreadInterrupted(String errorMsgLog, String stepInfo) { if (Thread.currentThread().isInterrupted()) { log.error("{} {}", QueryContext.current().getQueryId(), errorMsgLog); - if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) { + if (SlowQueryDetector.getRunningQueries().containsKey(Thread.currentThread()) + && SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) { throw new UserStopQueryException(""); } + QueryContext.current().getQueryTagInfo().setTimeout(true); throw new KylinTimeoutException("The query exceeds the set time limit of " + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo); } } + + /** + * Within the global context, STOP is same to CANCEL, therefore to stop a query is equal to + * cancel a query. + * There are some possible reasons to cancel the query recorded in the current thread context. + * {@link UserStopQueryException} is for stopping the query from the request, see + * {@link SlowQueryDetector#stopQuery(String)}. + * {@link KylinTimeoutException} is for run out of the timeout of the query, see + * {@link SlowQueryDetector::checkTimeout()}. + * {@link KylinRuntimeException} is risking inconsistent states to stop the query. + * {@link InterruptedException} is for other interruptions. + * @param cause the reason of canceling the current query or interrupt the working thread + * @param step the processing point + */ + public static void checkQueryCanceledOrThreadInterrupted(String cause, String step) throws InterruptedException { + SlowQueryDetector.QueryEntry entry = SlowQueryDetector.getRunningQueries().getOrDefault(Thread.currentThread(), + null); + if (entry != null) { + if (entry.isStopByUser() && entry.getPlannerCancelFlag().isCancelRequested() + && Thread.currentThread().isInterrupted()) { + throw new UserStopQueryException(String.format("Manually stop the query %s. Caused: %s. Step: %s", + entry.getQueryId(), cause, step)); + } + + if (entry.getPlannerCancelFlag().isCancelRequested() && Thread.currentThread().isInterrupted()) { + QueryContext.current().getQueryTagInfo().setTimeout(true); + throw new KylinTimeoutException(String.format("Run out of time of the query %s. Caused: %s. Step: %s", + entry.getQueryId(), cause, step)); + } + + if (entry.isStopByUser() || entry.getPlannerCancelFlag().isCancelRequested()) { + throw new UserStopQueryException(String.format( + "You are trying to cancel the query %s with inconsistent states:" + + " [isStopByUser=%s, isCancelRequested=%s]! Caused: %s. Step: %s", + entry.getQueryId(), entry.isStopByUser(), entry.getPlannerCancelFlag().isCancelRequested(), + cause, step)); + } + } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(String.format("Interrupted on thread %s. Caused: %s. Step: %s", + Thread.currentThread().getName(), cause, step)); + } + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java index a2ca994c5f..bc00ce3754 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java @@ -196,7 +196,7 @@ public class SlowQueryDetector extends Thread { return (System.currentTimeMillis() - startTime) / 1000; } - private boolean setInterruptIfTimeout() { + public boolean setInterruptIfTimeout() { if (isAsyncQuery) { return false; } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java new file mode 100644 index 0000000000..0cb9e8f62f --- /dev/null +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/PartitionPruningRuleTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.routing; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.exception.KylinTimeoutException; +import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.metadata.cube.model.LayoutPartition; +import org.apache.kylin.metadata.cube.model.NDataLayout; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.NDataflowUpdate; +import org.apache.kylin.metadata.model.MultiPartitionDesc; +import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.query.exception.UserStopQueryException; +import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.SlowQueryDetector; +import org.apache.kylin.util.OlapContextTestUtil; +import org.junit.Assert; +import org.junit.Test; + +public class PartitionPruningRuleTest extends NLocalWithSparkSessionTest { + + private Candidate prepareCandidate() throws SqlParseException { + String project = "multi_level_partition"; + String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba"; + NDataModelManager modelManager = NDataModelManager.getInstance(getTestConfig(), project); + + String sql = "select cal_dt, sum(price) from test_kylin_fact" + " inner join test_account" + + " on test_kylin_fact.seller_id = test_account.account_id" + + " where cal_dt > '2012-01-01' and cal_dt < '2012-01-04' and lstg_site_id = 1 group by cal_dt"; + OLAPContext olapContext = OlapContextTestUtil.getOlapContexts(project, sql, true).get(0); + + int newPartitionsNum = 1000_000; + + modelManager.updateDataModel(modelId, copied -> { + for (int i = 0; i < newPartitionsNum; i++) { + copied.getMultiPartitionDesc() + .getPartitions() + .add(new MultiPartitionDesc.PartitionInfo(i + 100, new String[] { String.valueOf(i + 100) })); + } + }); + + NDataflowManager dataflowManager = NDataflowManager.getInstance(getTestConfig(), project); + NDataflow dataflowCopied = dataflowManager.getDataflow(modelId).copy(); + + // append new partitions + NDataflowUpdate dataflowUpdate = new NDataflowUpdate(modelId); + List<NDataLayout> toUpdateLayouts = new ArrayList<>(); + for (NDataSegment segment : dataflowCopied.getSegments()) { + toUpdateLayouts.add(segment.getLayout(1)); + for (int i = 0; i < newPartitionsNum; i++) { + LayoutPartition layoutPartition = new LayoutPartition(i + 100); + layoutPartition.setBucketId(i + 100); + segment.getLayout(1).getMultiPartition().add(layoutPartition); + } + } + dataflowUpdate.setToAddOrUpdateLayouts(toUpdateLayouts.toArray(new NDataLayout[0])); + dataflowManager.updateDataflow(dataflowUpdate); + + NDataflow dataflow = dataflowManager.getDataflow(modelId); + Map<String, String> sqlAlias2ModelNameMap = OlapContextTestUtil.matchJoins(dataflow.getModel(), olapContext); + olapContext.fixModel(dataflow.getModel(), sqlAlias2ModelNameMap); + + Candidate candidate = new Candidate(dataflow, olapContext, sqlAlias2ModelNameMap); + candidate.getQueryableSeg() + .setBatchSegments(ImmutableList.of(dataflow.getSegment("8892fa3f-f607-4eec-8159-7c5ae2f16942"))); + return candidate; + } + + @Test + public void testCancelAndInterruptPruning() throws SqlParseException { + Candidate candidate = prepareCandidate(); + testCancelQuery(candidate); + testCancelQuery(candidate, queryEntry -> queryEntry.getPlannerCancelFlag().requestCancel()); + testCancelQuery(candidate, queryEntry -> queryEntry.setStopByUser(true)); + testCancelAsyncQuery(candidate); + testInterrupt(candidate); + testTimeout(candidate); + } + + private void testCancelQuery(Candidate candidate) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicBoolean res = new AtomicBoolean(false); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + PartitionPruningRule rule = new PartitionPruningRule(); + rule.apply(candidate); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + slowQueryDetector.get().stopQuery("pruning"); + + Assert.assertFalse(queryEntry.get().isAsyncQuery()); + Assert.assertTrue(queryEntry.get().isStopByUser() + && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertFalse(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + } + + private void testCancelQuery(Candidate candidate, Consumer<SlowQueryDetector.QueryEntry> updater) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicBoolean res = new AtomicBoolean(false); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + PartitionPruningRule rule = new PartitionPruningRule(); + rule.apply(candidate); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + Assert.assertTrue(t.isAlive()); + + updater.accept(queryEntry.get()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertFalse(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + Assert.assertTrue(exp.get().getMessage().contains("inconsistent states")); + } + + private void testCancelAsyncQuery(Candidate candidate) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicBoolean res = new AtomicBoolean(false); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + PartitionPruningRule rule = new PartitionPruningRule(); + rule.apply(candidate); + res.set(true); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + String queryId = queryEntry.get().getQueryId(); + slowQueryDetector.get().stopQuery(queryId); + + Assert.assertTrue(queryEntry.get().isAsyncQuery()); + Assert.assertTrue(queryEntry.get().isStopByUser() + && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertFalse(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + } + + private void testInterrupt(Candidate candidate) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicBoolean res = new AtomicBoolean(false); + + Thread t = new Thread(() -> { + try { + PartitionPruningRule rule = new PartitionPruningRule(); + rule.apply(candidate); + res.set(true); + } catch (Exception e) { + exp.set(e); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until(t::isAlive); + + t.interrupt(); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertFalse(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), exp.get() instanceof KylinRuntimeException); + Assert.assertTrue(exp.get().getCause() instanceof InterruptedException); + } + + private void testTimeout(Candidate candidate) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicBoolean res = new AtomicBoolean(false); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(false); + slowQueryDetector.set(new SlowQueryDetector(10, 100)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + PartitionPruningRule rule = new PartitionPruningRule(); + rule.apply(candidate); + res.set(true); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(500, TimeUnit.MILLISECONDS) + .atMost(60, TimeUnit.SECONDS).until(() -> queryEntry.get() != null); + + Assert.assertTrue(queryEntry.get().setInterruptIfTimeout()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + Assert.assertTrue(!t.isAlive() && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertFalse(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), exp.get() instanceof KylinTimeoutException); + } +} diff --git a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java index 5ffafcb0a6..7468c9706a 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/query/routing/SegmentPruningRuleTest.java @@ -18,13 +18,21 @@ package org.apache.kylin.query.routing; +import java.util.Calendar; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.rex.RexExecutorImpl; +import org.apache.calcite.sql.parser.SqlParseException; import org.apache.hadoop.util.Shell; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.common.util.TempMetadataBuilder; import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; @@ -33,11 +41,17 @@ import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.NDataflowUpdate; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.engine.TypeSystem; import org.apache.kylin.query.engine.meta.SimpleDataContext; +import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.util.OlapContextTestUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparderEnv; @@ -52,6 +66,8 @@ import org.springframework.test.util.ReflectionTestUtils; import lombok.val; +import static org.awaitility.Awaitility.await; + public class SegmentPruningRuleTest extends NLocalWithSparkSessionTest { @BeforeClass @@ -215,6 +231,267 @@ public class SegmentPruningRuleTest extends NLocalWithSparkSessionTest { Long.parseLong("1633928400000"), dateType); } + @Test + public void testCancelAndInterruptPruning() throws SqlParseException { + val dataflowId = "3718b614-5191-2254-77e9-f4c5ca64e312"; + KylinConfig kylinConfig = getTestConfig(); + + String sql = "SELECT * FROM TEST_DB.DATE_TIMESTAMP_TABLE WHERE id = '121' AND (\n" + + "(TIMESTAMP_10 >= '2021-11-03')\n" + "AND (TIMESTAMP_10 <= '2021-11-04')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-03')\n" + "AND (TIMESTAMP_10 <= '2021-11-05')\n" + ")\n" + "OR (\n" + "(\n" + + "TIMESTAMP_10 >= '2021-11-03'\n" + ")\n" + "AND (TIMESTAMP_10 <= '2021-11-06')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= '2021-11-04')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= '2021-11-05')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-04')\n" + "AND (TIMESTAMP_10 <= '2021-11-06')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= '2021-11-07')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= '2021-11-08')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-07')\n" + "AND (TIMESTAMP_10 <= '2021-11-12')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= '2021-11-17')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= '2021-11-18')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= '2021-11-19')\n" + ")\n" + "OR (\n" + + "(TIMESTAMP_10 >= '2021-11-17')\n" + "AND (TIMESTAMP_10 <= '2021-11-17')\n" + ")"; + + String project = getProject(); + NDataflowManager dataflowManager = NDataflowManager.getInstance(kylinConfig, project); + List<OLAPContext> olapContexts = OlapContextTestUtil.getOlapContexts(getProject(), sql); + OLAPContext context = olapContexts.get(0); + + // append new segments + Calendar calendar = Calendar.getInstance(); + calendar.set(2021, Calendar.DECEMBER, 6); + + final int newSegmentCount = 100_000; + NDataSegment[] newSegments = new NDataSegment[newSegmentCount]; + for (int i = 0; i < newSegmentCount; i++) { + long startTime = calendar.getTimeInMillis(); + calendar.add(Calendar.DAY_OF_MONTH, 1); + long endTime = calendar.getTimeInMillis(); + NDataSegment newSegment = new NDataSegment(null, + new SegmentRange.TimePartitionedSegmentRange(startTime, endTime)); + newSegment.setStatus(SegmentStatusEnum.READY); + newSegments[i] = newSegment; + } + + NDataflowUpdate update = new NDataflowUpdate(dataflowId); + update.setToAddSegs(newSegments); + dataflowManager.updateDataflowWithoutIndex(update); + + NDataflow dataflow = dataflowManager.getDataflow(dataflowId); + + CalciteSchema rootSchema = new QueryExec(project, kylinConfig).getRootSchema(); + SimpleDataContext dataContext = new SimpleDataContext(rootSchema.plus(), TypeSystem.javaTypeFactory(), + kylinConfig); + context.firstTableScan.getCluster().getPlanner().setExecutor(new RexExecutorImpl(dataContext)); + Map<String, String> map = RealizationChooser.matchJoins(dataflow.getModel(), context, false, false); + context.fixModel(dataflow.getModel(), map); + + Assert.assertTrue("Unexpected size " + dataflow.getQueryableSegments().size(), + dataflow.getQueryableSegments().size() > newSegmentCount); + + testCancelQuery(dataflow, context); + testCancelQuery(dataflow, context, queryEntry -> queryEntry.getPlannerCancelFlag().requestCancel()); + testCancelQuery(dataflow, context, queryEntry -> queryEntry.setStopByUser(true)); + testCancelAsyncQuery(dataflow, context); + testInterrupt(dataflow, context); + testTimeout(dataflow, context); + } + + private void testCancelQuery(NDataflow dataflow, OLAPContext context) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicReference<Segments<NDataSegment>> res = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + res.set(new SegmentPruningRule().pruneSegments(dataflow, context)); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + slowQueryDetector.get().stopQuery("pruning"); + + Assert.assertFalse(queryEntry.get().isAsyncQuery()); + Assert.assertTrue(queryEntry.get().isStopByUser() + && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertNull(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + } + + private void testCancelQuery(NDataflow dataflow, OLAPContext context, + Consumer<SlowQueryDetector.QueryEntry> updater) { + + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicReference<Segments<NDataSegment>> res = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + res.set(new SegmentPruningRule().pruneSegments(dataflow, context)); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + Assert.assertTrue(t.isAlive()); + + updater.accept(queryEntry.get()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertNull(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + Assert.assertTrue(exp.get().getMessage().contains("inconsistent states")); + } + + private void testCancelAsyncQuery(NDataflow dataflow, OLAPContext context) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicReference<Segments<NDataSegment>> res = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + slowQueryDetector.set(new SlowQueryDetector(100, 10_000)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + res.set(new SegmentPruningRule().pruneSegments(dataflow, context)); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS) + .until(() -> queryEntry.get() != null); + + String queryId = queryEntry.get().getQueryId(); + slowQueryDetector.get().stopQuery(queryId); + + Assert.assertTrue(queryEntry.get().isAsyncQuery()); + Assert.assertTrue(queryEntry.get().isStopByUser() + && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertNull(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), + exp.get() instanceof UserStopQueryException); + } + + private void testInterrupt(NDataflow dataflow, OLAPContext context) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicReference<Segments<NDataSegment>> res = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + res.set(new SegmentPruningRule().pruneSegments(dataflow, context)); + } catch (Exception e) { + exp.set(e); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(6, TimeUnit.SECONDS).until(t::isAlive); + + t.interrupt(); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertNull(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), exp.get() instanceof KylinRuntimeException); + Assert.assertTrue(exp.get().getCause() instanceof InterruptedException); + } + + private void testTimeout(NDataflow dataflow, OLAPContext context) { + AtomicReference<Exception> exp = new AtomicReference<>(null); + AtomicReference<Segments<NDataSegment>> res = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector> slowQueryDetector = new AtomicReference<>(null); + AtomicReference<SlowQueryDetector.QueryEntry> queryEntry = new AtomicReference<>(null); + + Thread t = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(false); + slowQueryDetector.set(new SlowQueryDetector(10, 100)); + slowQueryDetector.get().queryStart("pruning"); + queryEntry.set(SlowQueryDetector.getRunningQueries().get(Thread.currentThread())); + res.set(new SegmentPruningRule().pruneSegments(dataflow, context)); + } catch (Exception e) { + exp.set(e); + } finally { + slowQueryDetector.get().queryEnd(); + } + }); + t.start(); + + await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(100, TimeUnit.MILLISECONDS) + .atMost(60, TimeUnit.SECONDS).until(() -> queryEntry.get() != null); + + Assert.assertTrue(queryEntry.get().setInterruptIfTimeout()); + + try { + t.join(); + } catch (InterruptedException e) { + // ignored + } + + Assert.assertTrue(!t.isAlive() && queryEntry.get().getPlannerCancelFlag().isCancelRequested()); + + await().pollInterval(10, TimeUnit.MILLISECONDS).atMost(5, TimeUnit.SECONDS).until(() -> exp.get() != null); + + Assert.assertNull(res.get()); + Assert.assertTrue("Unexpected exception " + exp.get().getMessage(), exp.get() instanceof KylinTimeoutException); + } + @Test public void testSegmentPruningTimestampType() throws Exception { val dataflowId = "3718b614-5191-2254-77e9-f4c5ca64e312"; diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java index 9e0eb263c8..c9814b1f34 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/PartitionPruningRule.java @@ -34,6 +34,8 @@ import org.apache.calcite.rex.RexSimplify; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; @@ -47,8 +49,10 @@ import org.apache.kylin.metadata.model.MultiPartitionKeyMappingImpl; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.CapabilityResult; +import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.relnode.OLAPTableScan; +import org.apache.kylin.query.util.QueryInterruptChecker; import org.apache.kylin.query.util.RexUtils; import lombok.extern.slf4j.Slf4j; @@ -156,6 +160,10 @@ public class PartitionPruningRule extends PruningRule { List<TblColRef> partitionColRefs = model.getMultiPartitionDesc().getColumnRefs(); for (MultiPartitionDesc.PartitionInfo partition : model.getMultiPartitionDesc().getPartitions()) { try { + QueryInterruptChecker.checkQueryCanceledOrThreadInterrupted( + "Interrupted during pruning partitions!", + "pruning partitions"); + RexNode partitionRex = partitionToRexCall(partitionColRefs, partition.getValues(), rexBuilder, olapContext.allTableScans); RexNode mappingColRex = multiPartitionKeyMappingToRex(rexBuilder, partition.getValues(), @@ -175,6 +183,13 @@ public class PartitionPruningRule extends PruningRule { segPartitionMap.forEach((dataSegment, partitionIds) -> partitionIds.remove(partition.getId())); continue; } + } catch (InterruptedException ie) { + log.error(String.format("Interrupted on pruning partitions from %s!", partition.toString()), ie); + Thread.currentThread().interrupt(); + throw new KylinRuntimeException(ie); + } catch (UserStopQueryException | KylinTimeoutException e) { + log.error(String.format("Stop pruning partitions from %s!", partition.toString()), e); + throw e; } catch (Exception ex) { log.warn("Multi-partition pruning error: ", ex); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java b/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java index 3464271cc2..5618417e9d 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/routing/SegmentPruningRule.java @@ -47,6 +47,8 @@ import org.apache.calcite.util.NlsString; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.time.DateUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; import org.apache.kylin.guava30.shaded.common.collect.ImmutableSet; @@ -66,7 +68,9 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.realization.CapabilityResult; import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.QueryInterruptChecker; import org.apache.kylin.query.util.RexUtils; import lombok.val; @@ -187,29 +191,44 @@ public class SegmentPruningRule extends PruningRule { Segments<NDataSegment> selectedSegments = new Segments<>(); PartitionDesc partitionCol = getPartitionDesc(dataflow, olapContext); RexBuilder rexBuilder = olapContext.firstTableScan.getCluster().getRexBuilder(); + for (NDataSegment dataSegment : dataflow.getQueryableSegments()) { try { + QueryInterruptChecker.checkQueryCanceledOrThreadInterrupted( + "Interrupted during pruning segments by partition filter!", + "pruning segments by partition filter"); + + // To improve this simplification after fixed https://olapio.atlassian.net/browse/KE-42295 + // evaluate segment range [startTime, endTime) val segmentRanges = transformSegment2RexCall(dataSegment, partitionCol.getPartitionDateFormat(), - rexBuilder, partitionColInputRef, partitionCol.getPartitionDateColumnRef().getType(), - dataflow.isStreaming()); + rexBuilder, partitionColInputRef, partitionCol.getPartitionDateColumnRef().getType(), + dataflow.isStreaming()); // compare with segment start val segmentStartPredicate = RelOptPredicateList.of(rexBuilder, - Lists.newArrayList(segmentRanges.getFirst())); + Lists.newArrayList(segmentRanges.getFirst())); var simplifiedWithPredicate = rexSimplify.withPredicates(segmentStartPredicate) - .simplify(simplifiedSqlFilter); + .simplify(simplifiedSqlFilter); if (simplifiedWithPredicate.isAlwaysFalse()) { continue; } // compare with segment end val segmentEndPredicate = RelOptPredicateList.of(rexBuilder, - Lists.newArrayList(segmentRanges.getSecond())); + Lists.newArrayList(segmentRanges.getSecond())); simplifiedWithPredicate = rexSimplify.withPredicates(segmentEndPredicate) - .simplify(simplifiedWithPredicate); + .simplify(simplifiedWithPredicate); if (!simplifiedWithPredicate.isAlwaysFalse()) { selectedSegments.add(dataSegment); } + + } catch (InterruptedException ie) { + log.error(String.format("Interrupted on pruning segments from %s!", dataSegment.toString()), ie); + Thread.currentThread().interrupt(); + throw new KylinRuntimeException(ie); + } catch (UserStopQueryException | KylinTimeoutException e) { + log.error(String.format("Stop pruning segments from %s!", dataSegment.toString()), e); + throw e; } catch (Exception ex) { - log.warn("Segment pruning error: ", ex); + log.warn(String.format("To skip the exception on pruning segment %s!", dataSegment.toString()), ex); selectedSegments.add(dataSegment); } } diff --git a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java index fc6734dfcc..b6620a0091 100644 --- a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java +++ b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java @@ -25,11 +25,14 @@ import java.util.Locale; import org.apache.calcite.rel.RelNode; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinRuntimeException; +import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.realization.NoRealizationFoundException; import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; +import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.relnode.ContextUtil; import org.apache.kylin.query.relnode.KapRel; import org.apache.kylin.query.relnode.OLAPContext; @@ -86,6 +89,8 @@ public class QueryContextCutter { throw e; } reCutStrategy.tryCutToSmallerContexts(root, e); + } catch (UserStopQueryException | KylinTimeoutException | KylinRuntimeException e) { + throw e; } finally { // auto-modeling should invoke unfixModel() because it may select some realizations. if (isReCutBanned) {