marising commented on a change in pull request #4330: URL: https://github.com/apache/incubator-doris/pull/4330#discussion_r472609245
########## File path: fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java ########## @@ -575,6 +583,78 @@ private void handleSetStmt() { context.getState().setOk(); } + private void sendChannel(MysqlChannel channel, List<CacheProxy.CacheValue> cacheValues, boolean hitAll) + throws Exception { + RowBatch batch = null; + for (CacheBeProxy.CacheValue value : cacheValues) { + batch = value.getRowBatch(); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (hitAll) { + if (batch != null) { + statisticsForAuditLog = batch.getQueryStatistics(); + } + context.getState().setEof(); + return; + } + } + + private boolean handleCacheStmt(CacheAnalyzer cacheAnalyzer,MysqlChannel channel) throws Exception { + RowBatch batch = null; + CacheBeProxy.FetchCacheResult cacheResult = cacheAnalyzer.getCacheData(); + CacheMode mode = cacheAnalyzer.getCacheMode(); + if (cacheResult != null) { + isCached = true; + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Full) { + sendChannel(channel, cacheResult.getValueList(), true); + return true; + } + //rewrite sql + if (mode == CacheMode.Partition) { + if (cacheAnalyzer.getHitRange() == Cache.HitRange.Left) { + sendChannel(channel, cacheResult.getValueList(), false); + } + SelectStmt newSelectStmt = cacheAnalyzer.getRewriteStmt(); + newSelectStmt.reset(); + analyzer = new Analyzer(context.getCatalog(), context); + newSelectStmt.analyze(analyzer); + planner = new Planner(); + planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift()); + } + } + + coord = new Coordinator(context, analyzer, planner); + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + coord.exec(); + + while (true) { + batch = coord.getNext(); + if (batch.getBatch() != null) { + cacheAnalyzer.copyRowBatch(batch); + for (ByteBuffer row : batch.getBatch().getRows()) { + channel.sendOnePacket(row); + } + context.updateReturnRows(batch.getBatch().getRows().size()); + } + if (batch.isEos()) { + break; + } + } + + if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) { + sendChannel(channel, cacheResult.getValueList(), false); + } + + cacheAnalyzer.updateCache(); Review comment: The updateCache method determines whether the background Cache needs to be updated ``` public void updateCache() { if (cacheMode == CacheMode.None || cacheMode == CacheMode.NoNeed) { return; } cache.updateCache(); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org