This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 2e29cffe56 KYLIN-5367  fix updata layoutHits error in fusion model
2e29cffe56 is described below

commit 2e29cffe56bf8a9511cc86fabe6bcea6b8d8a260
Author: binbin.zheng <binbin.zh...@kyligence.io>
AuthorDate: Tue Oct 25 22:26:21 2022 +0800

    KYLIN-5367  fix updata layoutHits error in fusion model
---
 .../service/task/QueryHistoryTaskScheduler.java    |  30 +++--
 .../task/QueryHistoryTaskSchedulerTest.java        | 122 +++++++++++++++++++++
 .../kylin/metadata/cube/model/NDataflow.java       |   5 +
 .../apache/kylin/metadata/model/FusionModel.java   |   2 +-
 .../kylin/metadata/model/FusionModelManager.java   |  15 +++
 .../apache/kylin/metadata/query/QueryHistory.java  |  12 +-
 .../metadata/cube/model/NDataflowManagerTest.java  |  20 ++++
 .../metadata/streaming/FusionModelManagerTest.java |  24 ++++
 .../kylin/rest/service/AsyncTaskService.java       |   2 +-
 .../kylin/rest/service/QueryCacheManager.java      |   2 +-
 .../kylin/rest/service/QueryHistoryService.java    |  21 ++--
 .../apache/kylin/rest/service/QueryService.java    |   7 ++
 .../rest/service/QueryHistoryServiceTest.java      |  13 ++-
 .../kylin/rest/service/QueryServiceTest.java       |  87 ++++++++++++---
 14 files changed, 318 insertions(+), 44 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
index 1d3664e4ec..eae1ed351b 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
@@ -28,8 +28,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
@@ -224,20 +226,19 @@ public class QueryHistoryTaskScheduler {
         }
 
         private Map<String, DataflowHitCount> 
collectDataflowHitCount(List<QueryHistory> queryHistories) {
-            val dfManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             val result = Maps.<String, DataflowHitCount> newHashMap();
             for (QueryHistory queryHistory : queryHistories) {
-                val realizations = queryHistory.transformRealizations();
+                val realizations = queryHistory.transformRealizations(project);
                 if (CollectionUtils.isEmpty(realizations)) {
                     continue;
                 }
-                for (val realization : realizations) {
-                    if (dfManager.getDataflow(realization.getModelId()) == 
null || realization.getLayoutId() == null) {
-                        continue;
-                    }
-                    result.computeIfAbsent(realization.getModelId(), k -> new 
DataflowHitCount());
-                    result.get(realization.getModelId()).dataflowHit += 1;
-                    val layoutHits = 
result.get(realization.getModelId()).getLayoutHits();
+                val realizationList = 
realizations.stream().filter(this::isValidRealization)
+                        .collect(Collectors.toList());
+                for (val realization : realizationList) {
+                    String modelId = realization.getModelId();
+                    result.computeIfAbsent(modelId, k -> new 
DataflowHitCount());
+                    result.get(modelId).dataflowHit += 1;
+                    val layoutHits = result.get(modelId).getLayoutHits();
                     layoutHits.computeIfAbsent(realization.getLayoutId(), k -> 
new FrequencyMap());
                     
layoutHits.get(realization.getLayoutId()).incFrequency(queryHistory.getQueryTime());
                 }
@@ -245,6 +246,12 @@ public class QueryHistoryTaskScheduler {
             return result;
         }
 
+        private boolean isValidRealization(NativeQueryRealization realization) 
{
+            val config = KylinConfig.getInstanceFromEnv();
+            val dfManager = NDataflowManager.getInstance(config, project);
+            return dfManager.getDataflow(realization.getModelId()) != null && 
realization.getLayoutId() != null;
+        }
+
         private Map<TableExtDesc, Integer> 
collectSnapshotHitCount(List<QueryHistory> queryHistories) {
             val tableManager = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             val results = Maps.<TableExtDesc, Integer> newHashMap();
@@ -263,10 +270,13 @@ public class QueryHistoryTaskScheduler {
         }
 
         private void collectModelLastQueryTime(QueryHistory queryHistory, 
Map<String, Long> modelsLastQueryTime) {
-            List<NativeQueryRealization> realizations = 
queryHistory.transformRealizations();
+            List<NativeQueryRealization> realizations = 
queryHistory.transformRealizations(project);
             long queryTime = queryHistory.getQueryTime();
             for (NativeQueryRealization realization : realizations) {
                 String modelId = realization.getModelId();
+                if (StringUtils.isEmpty(modelId)) {
+                    continue;
+                }
                 modelsLastQueryTime.put(modelId, queryTime);
             }
         }
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
index dc4dba454e..18e2a86842 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import lombok.val;
+import lombok.var;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(TimeZoneTestRunner.class)
@@ -77,6 +78,8 @@ public class QueryHistoryTaskSchedulerTest extends 
NLocalFileMetadataTestCase {
     private static final String DATAFLOW = 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa";
     private static final String LAYOUT1 = "20000000001";
     private static final String LAYOUT2 = "1000001";
+    private static final String LAYOUT3 = "30001";
+    private static final String LAYOUT4 = "40001";
     private static final Long QUERY_TIME = 1586760398338L;
 
     private QueryHistoryTaskScheduler qhAccelerateScheduler;
@@ -343,6 +346,46 @@ public class QueryHistoryTaskSchedulerTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(16, 
idOffsetManager.get().getStatMetaUpdateOffset());
     }
 
+    @Test
+    public void testUpdateStatMeta() {
+        QueryHistoryTaskScheduler taskScheduler = new 
QueryHistoryTaskScheduler("streaming_test");
+        QueryHistoryTaskScheduler.QueryHistoryMetaUpdateRunner 
metaUpdateRunner = taskScheduler.new QueryHistoryMetaUpdateRunner();
+        NDataflowManager manager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), 
"streaming_test");
+        {
+            var dataflow = 
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+            Assert.assertTrue(dataflow.getLayoutHitCount().isEmpty());
+            ReflectionTestUtils.invokeMethod(metaUpdateRunner, 
"updateStatMeta", batchModelQueryHistory());
+            dataflow = 
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+            Assert.assertEquals(1, countDateFrequency(dataflow, LAYOUT3));
+        }
+        {
+            var batchDataflow = 
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+            var streamingDataflow = 
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+
+            
Assert.assertFalse(batchDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT4)));
+            
Assert.assertFalse(streamingDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT3)));
+
+            ReflectionTestUtils.invokeMethod(metaUpdateRunner, 
"updateStatMeta", fusionModelQueryHistory());
+            batchDataflow = 
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+            streamingDataflow = 
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+
+            Assert.assertEquals(1, countDateFrequency(batchDataflow, LAYOUT4));
+            Assert.assertEquals(1, countDateFrequency(streamingDataflow, 
LAYOUT3));
+        }
+        {
+            var streamingDataflow = 
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+            Assert.assertEquals(1, countDateFrequency(streamingDataflow, 
LAYOUT3));
+            ReflectionTestUtils.invokeMethod(metaUpdateRunner, 
"updateStatMeta", streamingModelQueryHistory());
+            streamingDataflow = 
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+            Assert.assertEquals(2, countDateFrequency(streamingDataflow, 
LAYOUT3));
+        }
+    }
+
+    private int countDateFrequency(NDataflow dataflow, String layout) {
+        return 
dataflow.getLayoutHitCount().get(Long.parseLong(layout)).getDateFrequency().values().stream()
+                .mapToInt(Integer::intValue).sum();
+    }
+
     private List<QueryHistory> queryHistories() {
         QueryHistory queryHistory1 = new QueryHistory();
         queryHistory1.setSqlPattern("select * from sql1");
@@ -517,6 +560,85 @@ public class QueryHistoryTaskSchedulerTest extends 
NLocalFileMetadataTestCase {
         return histories;
     }
 
+    /**
+     * sql match batch model of fusion model
+     * @return
+     */
+    private List<QueryHistory> batchModelQueryHistory() {
+        String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+        QueryHistory queryHistory1 = new QueryHistory();
+        queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM 
SSB.KAFKA_FUSION");
+        queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+        queryHistory1.setDuration(1000L);
+        queryHistory1.setQueryTime(QUERY_TIME);
+        queryHistory1.setEngineType("NATIVE");
+        QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+        queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(
+                new QueryMetrics.RealizationMetrics(LAYOUT3, "Agg Index", 
fusionModelId, Lists.newArrayList())));
+        queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+        queryHistory1.setId(10);
+        return Lists.newArrayList(queryHistory1);
+    }
+
+    /**
+     * sql match both batch model and streaming model in fusion model
+     * @return
+     */
+    private List<QueryHistory> fusionModelQueryHistory() {
+        String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+        String batchModelId = "334671fd-e383-4fc9-b5c2-94fce832f77a";
+        QueryHistory queryHistory1 = new QueryHistory();
+        queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM 
SSB.KAFKA_FUSION");
+        queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+        queryHistory1.setDuration(1000L);
+        queryHistory1.setQueryTime(QUERY_TIME);
+        queryHistory1.setEngineType("NATIVE");
+        QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+        QueryMetrics.RealizationMetrics streamingRealizationMetric = new 
QueryMetrics.RealizationMetrics(LAYOUT3,
+                "Agg Index", fusionModelId, Lists.newArrayList());
+        streamingRealizationMetric.setStreamingLayout(true);
+        
queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric,
+                new QueryMetrics.RealizationMetrics(LAYOUT4, "Agg Index", 
batchModelId, Lists.newArrayList())));
+        queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+        queryHistory1.setId(11);
+        return Lists.newArrayList(queryHistory1);
+    }
+
+    /**
+     * sql match streaming model in fusion model
+     * @return
+     */
+    private List<QueryHistory> streamingModelQueryHistory() {
+        String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+        QueryHistory queryHistory1 = new QueryHistory();
+        queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM 
SSB.KAFKA_FUSION");
+        queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+        queryHistory1.setDuration(1000L);
+        queryHistory1.setQueryTime(QUERY_TIME);
+        queryHistory1.setEngineType("NATIVE");
+        QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+        QueryMetrics.RealizationMetrics streamingRealizationMetric = new 
QueryMetrics.RealizationMetrics(LAYOUT3,
+                "Agg Index", fusionModelId, Lists.newArrayList());
+        streamingRealizationMetric.setStreamingLayout(true);
+        
queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric));
+        queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+        queryHistory1.setId(10);
+
+        QueryHistory queryHistory2 = new QueryHistory();
+        queryHistory2.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM 
SSB.KAFKA_FUSION");
+        queryHistory2.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+        queryHistory2.setDuration(1000L);
+        queryHistory2.setQueryTime(QUERY_TIME);
+        queryHistory2.setEngineType("NATIVE");
+        QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo();
+        QueryMetrics.RealizationMetrics realizationMetric = new 
QueryMetrics.RealizationMetrics(LAYOUT3,
+                "Agg Index", "error", Lists.newArrayList());
+        
queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(realizationMetric));
+        queryHistory2.setQueryHistoryInfo(queryHistoryInfo2);
+        queryHistory2.setId(11);
+        return Lists.newArrayList(queryHistory1, queryHistory2);
+    }
+
     int startOffset = 0;
 
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
index c72b4842e5..7a5d81b145 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
@@ -242,6 +242,11 @@ public class NDataflow extends RootPersistentEntity 
implements Serializable, IRe
         return model == null ? null : model.getAlias();
     }
 
+    public String getFusionModelAlias() {
+        NDataModel model = getModel();
+        return model == null ? null : model.getFusionModelAlias();
+    }
+
     @Override
     public Set<TblColRef> getAllColumns() {
         return getIndexPlan().listAllTblColRefs();
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
index d1ce44915d..28719ab1e1 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
@@ -120,7 +120,7 @@ public class FusionModel extends RootPersistentEntity 
implements Serializable {
         NDataModelManager modelManager = NDataModelManager.getInstance(config, 
project);
         for (String modelId : getModelsId()) {
             NDataModel model = modelManager.getDataModelDesc(modelId);
-            if (model.getModelType() == modelType) {
+            if (model != null && model.getModelType() == modelType) {
                 return model;
             }
         }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
index f0623f5fa2..45fdf65aae 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
@@ -18,9 +18,14 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.util.Objects;
+import java.util.Optional;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,4 +98,14 @@ public class FusionModelManager {
 
         return crud.save(desc);
     }
+
+    public String getModelId(NativeQueryRealization realization) {
+        String modelId = realization.getModelId();
+        FusionModel fusionModel = getFusionModel(modelId);
+        if (!realization.isStreamingLayout() && !Objects.isNull(fusionModel)) {
+            NDataModel dataModel = fusionModel.getBatchModel();
+            modelId = 
Optional.ofNullable(dataModel).map(RootPersistentEntity::getId).orElse("");
+        }
+        return modelId;
+    }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
index c24f1ae908..4414ff2fb4 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.model.FusionModelManager;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
@@ -31,7 +33,7 @@ import com.google.common.collect.Lists;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.val;
-import org.apache.kylin.common.util.JsonUtil;
+import lombok.extern.slf4j.Slf4j;
 
 @SuppressWarnings("serial")
 @Getter
@@ -192,7 +194,7 @@ public class QueryHistory {
         return queryStatus.equals(QUERY_HISTORY_FAILED);
     }
 
-    public List<NativeQueryRealization> transformRealizations() {
+    public List<NativeQueryRealization> transformRealizations(String project) {
         List<NativeQueryRealization> realizations = Lists.newArrayList();
         if (queryHistoryInfo == null || 
queryHistoryInfo.getRealizationMetrics() == null
                 || queryHistoryInfo.getRealizationMetrics().isEmpty()) {
@@ -200,7 +202,7 @@ public class QueryHistory {
         }
 
         List<QueryMetrics.RealizationMetrics> realizationMetrics = 
queryHistoryInfo.realizationMetrics;
-
+        val fusionModelManager = 
FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
         for (QueryMetrics.RealizationMetrics metrics : realizationMetrics) {
             val realization = new NativeQueryRealization(metrics.modelId,
                     metrics.layoutId == null || 
metrics.layoutId.equals("null") ? null
@@ -210,6 +212,8 @@ public class QueryHistory {
                             : metrics.snapshots);
             realization.setSecondStorage(metrics.isSecondStorage);
             realization.setStreamingLayout(metrics.isStreamingLayout);
+            String modelId = fusionModelManager.getModelId(realization);
+            realization.setModelId(modelId);
             realizations.add(realization);
         }
         return realizations;
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
index b83682d724..67796a1eb5 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
@@ -989,4 +989,24 @@ public class NDataflowManagerTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals(5, flatTableDesc2.getUsedColumns().size());
     }
 
+    @Test
+    public void testGetFusionModelAlias() {
+        String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4";
+        String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c";
+        NDataflowManager mgr = NDataflowManager.getInstance(getTestConfig(), 
"streaming_test");
+        Assert.assertEquals("fusion_model", 
mgr.getDataflow(streamingModelId).getFusionModelAlias());
+        Assert.assertEquals("fusion_model", 
mgr.getDataflow(batchModelId).getFusionModelAlias());
+
+        Assert.assertEquals("stream_merge",
+                
mgr.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72").getFusionModelAlias());
+
+        mgr = NDataflowManager.getInstance(getTestConfig(), projectDefault);
+        Assert.assertEquals("nmodel_basic_inner",
+                
mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
+
+        val modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), projectDefault);
+        modelManager.dropModel("741ca86a-1f13-46da-a59f-95fb68615e3a");
+        
Assert.assertNull(mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
+
+    }
 }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
index 54866b4fdc..c44fdec332 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
@@ -22,6 +22,7 @@ import 
org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.FusionModel;
 import org.apache.kylin.metadata.model.FusionModelManager;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -106,4 +107,27 @@ public class FusionModelManagerTest extends 
NLocalFileMetadataTestCase {
         Assert.assertTrue(streamingModel.fusionModelStreamingPart());
     }
 
+    @Test
+    public void testGetModelId() {
+        String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4";
+        String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c";
+
+        val realization = new NativeQueryRealization();
+        realization.setModelId(streamingModelId);
+        Assert.assertEquals(batchModelId, mgr.getModelId(realization));
+
+        realization.setModelId(streamingModelId);
+        realization.setStreamingLayout(true);
+        Assert.assertEquals(streamingModelId, mgr.getModelId(realization));
+
+        realization.setModelId(batchModelId);
+        realization.setStreamingLayout(false);
+        Assert.assertEquals(batchModelId, mgr.getModelId(realization));
+
+        realization.setModelId(streamingModelId);
+        val modelMgr = NDataModelManager.getInstance(getTestConfig(), PROJECT);
+        modelMgr.dropModel(batchModelId);
+        Assert.assertEquals("", mgr.getModelId(realization));
+    }
+
 }
diff --git 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
index 1dcad68b23..1b26150623 100644
--- 
a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
+++ 
b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
@@ -128,7 +128,7 @@ public class AsyncTaskService implements 
AsyncTaskServiceSupporter {
         val noBrokenModels = NDataflowManager.getInstance(kylinConfig, 
project).listUnderliningDataModels().stream()
                 .collect(Collectors.toMap(NDataModel::getAlias, 
RootPersistentEntity::getUuid));
         val dataModelManager = NDataModelManager.getInstance(kylinConfig, 
project);
-        List<NativeQueryRealization> realizations = qh.transformRealizations();
+        List<NativeQueryRealization> realizations = 
qh.transformRealizations(project);
 
         realizations.forEach(realization -> {
             NDataModel nDataModel = 
dataModelManager.getDataModelDesc(realization.getModelId());
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index bc38e2c99a..6f7ecff94b 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -195,7 +195,7 @@ public class QueryCacheManager implements 
CommonQueryCacheSupporter {
             val modelId = nativeQueryRealization.getModelId();
             val dataflow = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
                     .getDataflow(modelId);
-            nativeQueryRealization.setModelAlias(dataflow.getModelAlias());
+            
nativeQueryRealization.setModelAlias(dataflow.getFusionModelAlias());
         }
 
         return cached;
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
index f7c3bfc747..7479631821 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
@@ -40,7 +40,6 @@ import java.util.stream.Collectors;
 
 import javax.servlet.http.HttpServletResponse;
 
-import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -48,9 +47,6 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.rest.exception.ForbiddenException;
-import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.common.util.Unsafe;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
@@ -59,6 +55,7 @@ import 
org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.query.NativeQueryRealization;
 import org.apache.kylin.metadata.query.QueryHistory;
 import org.apache.kylin.metadata.query.QueryHistoryDAO;
@@ -66,8 +63,10 @@ import org.apache.kylin.metadata.query.QueryHistoryInfo;
 import org.apache.kylin.metadata.query.QueryHistoryRequest;
 import org.apache.kylin.metadata.query.QueryStatistics;
 import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
+import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.response.NDataModelResponse;
 import org.apache.kylin.rest.response.QueryStatisticsResponse;
+import org.apache.kylin.rest.util.AclEvaluate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -79,6 +78,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap;
 import lombok.val;
 
 @Component("queryHistoryService")
@@ -191,13 +191,16 @@ public class QueryHistoryService extends BasicService 
implements AsyncTaskQueryH
                 .listUnderliningDataModels().stream()
                 .collect(Collectors.toMap(NDataModel::getAlias, 
RootPersistentEntity::getUuid));
 
-        val dataModelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        val indexPlanManager = 
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        List<NativeQueryRealization> realizations = 
query.transformRealizations();
+        val config = KylinConfig.getInstanceFromEnv();
+        val indexPlanManager = NIndexPlanManager.getInstance(config, project);
+        val modelManager = NDataModelManager.getInstance(config, project);
+
+        List<NativeQueryRealization> realizations = 
query.transformRealizations(project);
 
         realizations.forEach(realization -> {
-            NDataModel nDataModel = 
dataModelManager.getDataModelDesc(realization.getModelId());
-            if (noBrokenModels.containsValue(realization.getModelId())) {
+            String modelId = realization.getModelId();
+            NDataModel nDataModel = modelManager.getDataModelDesc(modelId);
+            if (noBrokenModels.containsValue(modelId)) {
                 NDataModelResponse model = (NDataModelResponse) modelService
                         .updateResponseAcl(new NDataModelResponse(nDataModel), 
project);
                 realization.setModelAlias(model.getFusionModelAlias());
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 69d75265df..b1361132f9 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -83,6 +83,7 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.acl.AclTCR;
 import org.apache.kylin.metadata.acl.AclTCRManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.kylin.metadata.model.FusionModelManager;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -636,6 +637,12 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                         QueryContext.currentMetrics().getTotalScanRows());
             }
 
+            val fusionManager = 
FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(),
+                    sqlRequest.getProject());
+            if 
(CollectionUtils.isNotEmpty(sqlResponse.getNativeRealizations())) {
+                sqlResponse.getNativeRealizations().stream()
+                        .forEach(realization -> 
realization.setModelId(fusionManager.getModelId(realization)));
+            }
             //check query result row count
             
NCircuitBreaker.verifyQueryResultRowCount(sqlResponse.getResultRowCount());
 
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index da763292c8..a53f9af8e7 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -902,12 +902,17 @@ public class QueryHistoryServiceTest extends 
NLocalFileMetadataTestCase {
         QueryHistory query2 = new QueryHistory();
         query2.setSql("select * from test_table_2");
 
+        QueryHistory query3 = new QueryHistory();
+        query3.setSql("select * from test_table_3");
+
         QueryMetrics.RealizationMetrics metrics1 = new 
QueryMetrics.RealizationMetrics("1", "Agg Index",
                 "b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new 
String[] {}));
         QueryMetrics.RealizationMetrics metrics2 = new 
QueryMetrics.RealizationMetrics("1", "Agg Index",
                 "334671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new 
String[] {}));
         QueryMetrics.RealizationMetrics metrics3 = new 
QueryMetrics.RealizationMetrics("1", "Agg Index",
                 "554671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new 
String[] {}));
+        QueryMetrics.RealizationMetrics metrics4 = new 
QueryMetrics.RealizationMetrics("1", "Agg Index",
+                "b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new 
String[] {}));
 
         QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
         queryHistoryInfo1.setRealizationMetrics(
@@ -917,8 +922,12 @@ public class QueryHistoryServiceTest extends 
NLocalFileMetadataTestCase {
         QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo();
         queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(new 
QueryMetrics.RealizationMetrics[] { metrics3 }));
         query2.setQueryHistoryInfo(queryHistoryInfo2);
+
+        QueryHistoryInfo queryHistoryInfo3 = new QueryHistoryInfo();
+        queryHistoryInfo3.setRealizationMetrics(Lists.newArrayList(new 
QueryMetrics.RealizationMetrics[] { metrics4 }));
+        query3.setQueryHistoryInfo(queryHistoryInfo3);
         RDBMSQueryHistoryDAO queryHistoryDAO = 
Mockito.mock(RDBMSQueryHistoryDAO.class);
-        Mockito.doReturn(Lists.newArrayList(query1, 
query2)).when(queryHistoryDAO)
+        Mockito.doReturn(Lists.newArrayList(query1, query2, 
query3)).when(queryHistoryDAO)
                 .getQueryHistoriesByConditions(Mockito.any(), 
Mockito.anyInt(), Mockito.anyInt());
         
Mockito.doReturn(10L).when(queryHistoryDAO).getQueryHistoriesSize(Mockito.any(),
 Mockito.anyString());
         
Mockito.doReturn(queryHistoryDAO).when(queryHistoryService).getQueryHistoryDao();
@@ -930,6 +939,8 @@ public class QueryHistoryServiceTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals("streaming_test",
                 
queryHistories.get(0).getNativeQueryRealizations().get(1).getModelAlias());
         Assert.assertEquals("batch", 
queryHistories.get(1).getNativeQueryRealizations().get(0).getModelAlias());
+        Assert.assertEquals("334671fd-e383-4fc9-b5c2-94fce832f77a",
+                
queryHistories.get(2).getNativeQueryRealizations().get(0).getModelId());
     }
 
     @Test
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 3a6f6670da..fec084ee69 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -586,6 +586,39 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
         mockQueryWithSqlMassage();
     }
 
+    private void mockOLAPContextWithBatchPart() throws Exception {
+        val modelManager = Mockito
+                
.spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), 
"streaming_test"));
+
+        
Mockito.doReturn(modelManager).when(queryService).getManager(NDataModelManager.class,
 "streaming_test");
+        // mock agg index realization
+        OLAPContext aggMock = new OLAPContext(1);
+        NDataModel mockModel1 = Mockito.spy(new NDataModel());
+        
Mockito.when(mockModel1.getUuid()).thenReturn("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+        Mockito.when(mockModel1.getAlias()).thenReturn("model_streaming");
+        
Mockito.doReturn(mockModel1).when(modelManager).getDataModelDesc("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+
+        IRealization batchRealization = Mockito.mock(IRealization.class);
+        
Mockito.when(batchRealization.getUuid()).thenReturn("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
+
+        HybridRealization hybridRealization = 
Mockito.mock(HybridRealization.class);
+        Mockito.when(hybridRealization.getModel()).thenReturn(mockModel1);
+        
Mockito.when(hybridRealization.getBatchRealization()).thenReturn(batchRealization);
+
+        aggMock.realization = hybridRealization;
+        IndexEntity mockIndexEntity1 = new IndexEntity();
+        mockIndexEntity1.setId(1);
+        LayoutEntity mockLayout1 = new LayoutEntity();
+        mockLayout1.setIndex(mockIndexEntity1);
+        aggMock.storageContext.setCandidate(new NLayoutCandidate(mockLayout1));
+        aggMock.storageContext.setLayoutId(20001L);
+        aggMock.storageContext.setPrunedSegments(Lists.newArrayList(new 
NDataSegment()));
+        OLAPContext.registerContext(aggMock);
+
+        Mockito.doNothing().when(queryService).clearThreadLocalContexts();
+        mockQueryWithSqlMassage();
+    }
+
     private void mockOLAPContextWithStreaming() throws Exception {
         val modelManager = 
Mockito.spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), 
"demo"));
 
@@ -1991,28 +2024,48 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
     @Test
     public void testQueryContextWithFusionModel() throws Exception {
         final String project = "streaming_test";
-        final String sql = "select count(*) from SSB_STREAMING";
 
-        stubQueryConnection(sql, project);
-        mockOLAPContextWithHybrid();
+        {
+            final String sql = "select count(*) from SSB_STREAMING";
 
-        final SQLRequest request = new SQLRequest();
-        request.setProject(project);
-        request.setSql(sql);
-        
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
-        SQLResponse sqlResponse = queryService.doQueryWithCache(request);
+            stubQueryConnection(sql, project);
+            mockOLAPContextWithHybrid();
 
-        Assert.assertEquals(2, sqlResponse.getNativeRealizations().size());
+            final SQLRequest request = new SQLRequest();
+            request.setProject(project);
+            request.setSql(sql);
+            
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
+            SQLResponse sqlResponse = queryService.doQueryWithCache(request);
 
-        Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d",
-                sqlResponse.getNativeRealizations().get(0).getModelId());
-        Assert.assertEquals((Long) 10001L, 
sqlResponse.getNativeRealizations().get(0).getLayoutId());
-        Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
-                sqlResponse.getNativeRealizations().get(1).getModelId());
-        Assert.assertEquals((Long) 20001L, 
sqlResponse.getNativeRealizations().get(1).getLayoutId());
+            Assert.assertEquals(2, sqlResponse.getNativeRealizations().size());
 
-        
Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
-        
Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout());
+            Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d",
+                    sqlResponse.getNativeRealizations().get(0).getModelId());
+            Assert.assertEquals((Long) 10001L, 
sqlResponse.getNativeRealizations().get(0).getLayoutId());
+            Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
+                    sqlResponse.getNativeRealizations().get(1).getModelId());
+            Assert.assertEquals((Long) 20001L, 
sqlResponse.getNativeRealizations().get(1).getLayoutId());
+
+            
Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
+            
Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout());
+        }
+        {
+            final String sql = "select count(1) from SSB_STREAMING";
+
+            stubQueryConnection(sql, project);
+            mockOLAPContextWithBatchPart();
+
+            final SQLRequest request = new SQLRequest();
+            request.setProject(project);
+            request.setSql(sql);
+            
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
+            SQLResponse sqlResponse = queryService.doQueryWithCache(request);
+
+            Assert.assertEquals(1, sqlResponse.getNativeRealizations().size());
+            
Assert.assertFalse(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
+            Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
+                    sqlResponse.getNativeRealizations().get(0).getModelId());
+        }
     }
 
     @Test


Reply via email to