This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.0.1 by this push: new cc7da636db [fix](partition-cache) fix result may not write when enable partition cache (#10319) cc7da636db is described below commit cc7da636db0a680728ff7e951dff7cfe03c060f2 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Wed Jun 22 23:42:46 2022 +0800 [fix](partition-cache) fix result may not write when enable partition cache (#10319) fix result may not write when enable partition cache --- .../java/org/apache/doris/qe/ConnectProcessor.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 76 ++++++++------------ .../org/apache/doris/qe/cache/CacheAnalyzer.java | 6 ++ .../data/query/cache/partition_cache.out | 14 ++++ .../suites/query/cache/partition_cache.groovy | 80 ++++++++++++++++++++++ 5 files changed, 128 insertions(+), 50 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 87f2f9f294..a0da3f94f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -386,7 +386,7 @@ public class ConnectProcessor { if (resultSet == null) { packet = executor.getOutputPacket(); } else { - executor.sendResult(resultSet); + executor.sendResultSet(resultSet); packet = getResultPacket(); if (packet == null) { LOG.debug("packet == null"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ec36508e0f..2f1124234d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -837,8 +837,8 @@ public class StmtExecutor implements ProfileWriter { /** * Handle the SelectStmt via Cache. */ - private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception { - RowBatch batch = null; + private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) + throws Exception { InternalService.PFetchCacheResult cacheResult = cacheAnalyzer.getCacheData(); CacheMode mode = cacheAnalyzer.getCacheMode(); SelectStmt newSelectStmt = selectStmt; @@ -862,44 +862,7 @@ public class StmtExecutor implements ProfileWriter { planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift()); } } - - coord = new Coordinator(context, analyzer, planner); - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - coord.exec(); - - while (true) { - batch = coord.getNext(); - if (batch.getBatch() != null) { - cacheAnalyzer.copyRowBatch(batch); - if (!isSendFields) { - sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs())); - isSendFields = true; - } - for (ByteBuffer row : batch.getBatch().getRows()) { - channel.sendOnePacket(row); - } - context.updateReturnRows(batch.getBatch().getRows().size()); - } - if (batch.isEos()) { - break; - } - } - - if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { - isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false); - } - - cacheAnalyzer.updateCache(); - - if (!isSendFields) { - sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs())); - isSendFields = true; - } - - statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder(); - context.getState().setEof(); - return; + sendResult(false, isSendFields, newSelectStmt, channel, cacheAnalyzer, cacheResult); } private boolean handleSelectRequestInFe(SelectStmt parsedSelectStmt) throws IOException { @@ -921,7 +884,7 @@ public class StmtExecutor implements ProfileWriter { } } ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); - sendResult(resultSet); + sendResultSet(resultSet); return true; } @@ -955,7 +918,7 @@ public class StmtExecutor implements ProfileWriter { return; } - RowBatch batch; + MysqlChannel channel = context.getMysqlChannel(); boolean isOutfileQuery = queryStmt.hasOutFileClause(); @@ -965,8 +928,12 @@ public class StmtExecutor implements ProfileWriter { handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt); return; } + sendResult(isOutfileQuery, false, queryStmt, channel, null, null); + } + - // send result + private void sendResult(boolean isOutfileQuery, boolean isSendFields, QueryStmt queryStmt, MysqlChannel channel, + CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception { // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx, // We will not send real query result to client. Instead, we only send OK to client with // number of rows selected. For example: @@ -974,7 +941,7 @@ public class StmtExecutor implements ProfileWriter { // Query OK, 10 rows affected (0.01 sec) // // 2. If this is a query, send the result expr fields first, and send result data back to client. - boolean isSendFields = false; + RowBatch batch; coord = new Coordinator(context, analyzer, planner); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); @@ -986,7 +953,11 @@ public class StmtExecutor implements ProfileWriter { batch = coord.getNext(); // for outfile query, there will be only one empty batch send back with eos flag if (batch.getBatch() != null) { - // For some language driver, getting error packet after fields packet will be recognized as a success result + if (cacheAnalyzer != null) { + cacheAnalyzer.copyRowBatch(batch); + } + // For some language driver, getting error packet after fields packet + // will be recognized as a success result // so We need to send fields after first batch arrived if (!isSendFields) { if (!isOutfileQuery) { @@ -1005,6 +976,14 @@ public class StmtExecutor implements ProfileWriter { break; } } + if (cacheAnalyzer != null) { + if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { + isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt, + isSendFields, false); + } + + cacheAnalyzer.updateCache(); + } if (!isSendFields) { if (!isOutfileQuery) { sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs())); @@ -1018,7 +997,6 @@ public class StmtExecutor implements ProfileWriter { plannerProfile.setQueryFetchResultFinishTime(); } - private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception { TWaitingTxnStatusResult statusResult = null; if (Catalog.getCurrentCatalog().isMaster()) { @@ -1469,7 +1447,7 @@ public class StmtExecutor implements ProfileWriter { context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer()); } - public void sendResult(ResultSet resultSet) throws IOException { + public void sendResultSet(ResultSet resultSet) throws IOException { context.updateReturnRows(resultSet.getResultRows().size()); // Send meta data. sendMetaData(resultSet.getMetaData()); @@ -1503,7 +1481,7 @@ public class StmtExecutor implements ProfileWriter { return; } - sendResult(resultSet); + sendResultSet(resultSet); } private void handleUnlockTablesStmt() { @@ -1569,7 +1547,7 @@ public class StmtExecutor implements ProfileWriter { // create table DdlExecutor.execute(context.getCatalog(), ctasStmt); context.getState().setOk(); - } catch (Exception e) { + } catch (Exception e) { // Maybe our bug LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e); context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index 480650e4e8..a4e4844f0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -208,6 +208,12 @@ public class CacheAnalyzer { LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId)); return CacheMode.None; } + if (enablePartitionCache() && ((OlapScanNode) node).getSelectedPartitionNum() > 1 + && selectStmt.hasGroupByClause()) { + LOG.debug("more than one partition scanned when qeury has agg, partition cache cannot use, queryid {}", + DebugUtil.printId(queryId)); + return CacheMode.None; + } CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node); tblTimeList.add(cTable); } diff --git a/regression-test/data/query/cache/partition_cache.out b/regression-test/data/query/cache/partition_cache.out new file mode 100644 index 0000000000..04875e2448 --- /dev/null +++ b/regression-test/data/query/cache/partition_cache.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !partition_cache -- +2022-05-28 0 +2022-05-29 0 +2022-05-30 0 +2022-06-01 0 +2022-06-02 0 + +-- !partition_cache -- +2022-05-28 0 +2022-05-29 0 +2022-05-30 0 +2022-06-01 0 +2022-06-02 0 diff --git a/regression-test/suites/query/cache/partition_cache.groovy b/regression-test/suites/query/cache/partition_cache.groovy new file mode 100644 index 0000000000..206708a2a2 --- /dev/null +++ b/regression-test/suites/query/cache/partition_cache.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate +// and modified by Doris. + +suite("partition_cache") { + def tableName = "test_partition_cache" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')), + PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 32 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql """ INSERT INTO ${tableName} VALUES + ("2022-05-27",0), + ("2022-05-28",0), + ("2022-05-29",0), + ("2022-05-30",0), + ("2022-06-01",0), + ("2022-06-02",0) + """ + sql " set enable_partition_cache=true " + + qt_partition_cache """ + select + k1, + sum(k2) as total_pv + from + ${tableName} + where + k1 between '2022-05-28' and '2022-06-30' + group by + k1 + order by + k1; + """ + qt_partition_cache """ + select + k1, + sum(k2) as total_pv + from + ${tableName} + where + k1 between '2022-05-28' and '2022-06-30' + group by + k1 + order by + k1; + """ + sql " set enable_partition_cache=false " +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org