This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 2e29cffe56 KYLIN-5367 fix updata layoutHits error in fusion model 2e29cffe56 is described below commit 2e29cffe56bf8a9511cc86fabe6bcea6b8d8a260 Author: binbin.zheng <binbin.zh...@kyligence.io> AuthorDate: Tue Oct 25 22:26:21 2022 +0800 KYLIN-5367 fix updata layoutHits error in fusion model --- .../service/task/QueryHistoryTaskScheduler.java | 30 +++-- .../task/QueryHistoryTaskSchedulerTest.java | 122 +++++++++++++++++++++ .../kylin/metadata/cube/model/NDataflow.java | 5 + .../apache/kylin/metadata/model/FusionModel.java | 2 +- .../kylin/metadata/model/FusionModelManager.java | 15 +++ .../apache/kylin/metadata/query/QueryHistory.java | 12 +- .../metadata/cube/model/NDataflowManagerTest.java | 20 ++++ .../metadata/streaming/FusionModelManagerTest.java | 24 ++++ .../kylin/rest/service/AsyncTaskService.java | 2 +- .../kylin/rest/service/QueryCacheManager.java | 2 +- .../kylin/rest/service/QueryHistoryService.java | 21 ++-- .../apache/kylin/rest/service/QueryService.java | 7 ++ .../rest/service/QueryHistoryServiceTest.java | 13 ++- .../kylin/rest/service/QueryServiceTest.java | 87 ++++++++++++--- 14 files changed, 318 insertions(+), 44 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java index 1d3664e4ec..eae1ed351b 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java @@ -28,8 +28,10 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.NamedThreadFactory; @@ -224,20 +226,19 @@ public class QueryHistoryTaskScheduler { } private Map<String, DataflowHitCount> collectDataflowHitCount(List<QueryHistory> queryHistories) { - val dfManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project); val result = Maps.<String, DataflowHitCount> newHashMap(); for (QueryHistory queryHistory : queryHistories) { - val realizations = queryHistory.transformRealizations(); + val realizations = queryHistory.transformRealizations(project); if (CollectionUtils.isEmpty(realizations)) { continue; } - for (val realization : realizations) { - if (dfManager.getDataflow(realization.getModelId()) == null || realization.getLayoutId() == null) { - continue; - } - result.computeIfAbsent(realization.getModelId(), k -> new DataflowHitCount()); - result.get(realization.getModelId()).dataflowHit += 1; - val layoutHits = result.get(realization.getModelId()).getLayoutHits(); + val realizationList = realizations.stream().filter(this::isValidRealization) + .collect(Collectors.toList()); + for (val realization : realizationList) { + String modelId = realization.getModelId(); + result.computeIfAbsent(modelId, k -> new DataflowHitCount()); + result.get(modelId).dataflowHit += 1; + val layoutHits = result.get(modelId).getLayoutHits(); layoutHits.computeIfAbsent(realization.getLayoutId(), k -> new FrequencyMap()); layoutHits.get(realization.getLayoutId()).incFrequency(queryHistory.getQueryTime()); } @@ -245,6 +246,12 @@ public class QueryHistoryTaskScheduler { return result; } + private boolean isValidRealization(NativeQueryRealization realization) { + val config = KylinConfig.getInstanceFromEnv(); + val dfManager = NDataflowManager.getInstance(config, project); + return dfManager.getDataflow(realization.getModelId()) != null && realization.getLayoutId() != null; + } + private Map<TableExtDesc, Integer> collectSnapshotHitCount(List<QueryHistory> queryHistories) { val tableManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project); val results = Maps.<TableExtDesc, Integer> newHashMap(); @@ -263,10 +270,13 @@ public class QueryHistoryTaskScheduler { } private void collectModelLastQueryTime(QueryHistory queryHistory, Map<String, Long> modelsLastQueryTime) { - List<NativeQueryRealization> realizations = queryHistory.transformRealizations(); + List<NativeQueryRealization> realizations = queryHistory.transformRealizations(project); long queryTime = queryHistory.getQueryTime(); for (NativeQueryRealization realization : realizations) { String modelId = realization.getModelId(); + if (StringUtils.isEmpty(modelId)) { + continue; + } modelsLastQueryTime.put(modelId, queryTime); } } diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java index dc4dba454e..18e2a86842 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java @@ -67,6 +67,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.val; +import lombok.var; @RunWith(PowerMockRunner.class) @PowerMockRunnerDelegate(TimeZoneTestRunner.class) @@ -77,6 +78,8 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { private static final String DATAFLOW = "89af4ee2-2cdb-4b07-b39e-4c29856309aa"; private static final String LAYOUT1 = "20000000001"; private static final String LAYOUT2 = "1000001"; + private static final String LAYOUT3 = "30001"; + private static final String LAYOUT4 = "40001"; private static final Long QUERY_TIME = 1586760398338L; private QueryHistoryTaskScheduler qhAccelerateScheduler; @@ -343,6 +346,46 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { Assert.assertEquals(16, idOffsetManager.get().getStatMetaUpdateOffset()); } + @Test + public void testUpdateStatMeta() { + QueryHistoryTaskScheduler taskScheduler = new QueryHistoryTaskScheduler("streaming_test"); + QueryHistoryTaskScheduler.QueryHistoryMetaUpdateRunner metaUpdateRunner = taskScheduler.new QueryHistoryMetaUpdateRunner(); + NDataflowManager manager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), "streaming_test"); + { + var dataflow = manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a"); + Assert.assertTrue(dataflow.getLayoutHitCount().isEmpty()); + ReflectionTestUtils.invokeMethod(metaUpdateRunner, "updateStatMeta", batchModelQueryHistory()); + dataflow = manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a"); + Assert.assertEquals(1, countDateFrequency(dataflow, LAYOUT3)); + } + { + var batchDataflow = manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a"); + var streamingDataflow = manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40"); + + Assert.assertFalse(batchDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT4))); + Assert.assertFalse(streamingDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT3))); + + ReflectionTestUtils.invokeMethod(metaUpdateRunner, "updateStatMeta", fusionModelQueryHistory()); + batchDataflow = manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a"); + streamingDataflow = manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40"); + + Assert.assertEquals(1, countDateFrequency(batchDataflow, LAYOUT4)); + Assert.assertEquals(1, countDateFrequency(streamingDataflow, LAYOUT3)); + } + { + var streamingDataflow = manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40"); + Assert.assertEquals(1, countDateFrequency(streamingDataflow, LAYOUT3)); + ReflectionTestUtils.invokeMethod(metaUpdateRunner, "updateStatMeta", streamingModelQueryHistory()); + streamingDataflow = manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40"); + Assert.assertEquals(2, countDateFrequency(streamingDataflow, LAYOUT3)); + } + } + + private int countDateFrequency(NDataflow dataflow, String layout) { + return dataflow.getLayoutHitCount().get(Long.parseLong(layout)).getDateFrequency().values().stream() + .mapToInt(Integer::intValue).sum(); + } + private List<QueryHistory> queryHistories() { QueryHistory queryHistory1 = new QueryHistory(); queryHistory1.setSqlPattern("select * from sql1"); @@ -517,6 +560,85 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { return histories; } + /** + * sql match batch model of fusion model + * @return + */ + private List<QueryHistory> batchModelQueryHistory() { + String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40"; + QueryHistory queryHistory1 = new QueryHistory(); + queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM SSB.KAFKA_FUSION"); + queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED); + queryHistory1.setDuration(1000L); + queryHistory1.setQueryTime(QUERY_TIME); + queryHistory1.setEngineType("NATIVE"); + QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo(); + queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList( + new QueryMetrics.RealizationMetrics(LAYOUT3, "Agg Index", fusionModelId, Lists.newArrayList()))); + queryHistory1.setQueryHistoryInfo(queryHistoryInfo1); + queryHistory1.setId(10); + return Lists.newArrayList(queryHistory1); + } + + /** + * sql match both batch model and streaming model in fusion model + * @return + */ + private List<QueryHistory> fusionModelQueryHistory() { + String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40"; + String batchModelId = "334671fd-e383-4fc9-b5c2-94fce832f77a"; + QueryHistory queryHistory1 = new QueryHistory(); + queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM SSB.KAFKA_FUSION"); + queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED); + queryHistory1.setDuration(1000L); + queryHistory1.setQueryTime(QUERY_TIME); + queryHistory1.setEngineType("NATIVE"); + QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo(); + QueryMetrics.RealizationMetrics streamingRealizationMetric = new QueryMetrics.RealizationMetrics(LAYOUT3, + "Agg Index", fusionModelId, Lists.newArrayList()); + streamingRealizationMetric.setStreamingLayout(true); + queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric, + new QueryMetrics.RealizationMetrics(LAYOUT4, "Agg Index", batchModelId, Lists.newArrayList()))); + queryHistory1.setQueryHistoryInfo(queryHistoryInfo1); + queryHistory1.setId(11); + return Lists.newArrayList(queryHistory1); + } + + /** + * sql match streaming model in fusion model + * @return + */ + private List<QueryHistory> streamingModelQueryHistory() { + String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40"; + QueryHistory queryHistory1 = new QueryHistory(); + queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM SSB.KAFKA_FUSION"); + queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED); + queryHistory1.setDuration(1000L); + queryHistory1.setQueryTime(QUERY_TIME); + queryHistory1.setEngineType("NATIVE"); + QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo(); + QueryMetrics.RealizationMetrics streamingRealizationMetric = new QueryMetrics.RealizationMetrics(LAYOUT3, + "Agg Index", fusionModelId, Lists.newArrayList()); + streamingRealizationMetric.setStreamingLayout(true); + queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric)); + queryHistory1.setQueryHistoryInfo(queryHistoryInfo1); + queryHistory1.setId(10); + + QueryHistory queryHistory2 = new QueryHistory(); + queryHistory2.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM SSB.KAFKA_FUSION"); + queryHistory2.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED); + queryHistory2.setDuration(1000L); + queryHistory2.setQueryTime(QUERY_TIME); + queryHistory2.setEngineType("NATIVE"); + QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo(); + QueryMetrics.RealizationMetrics realizationMetric = new QueryMetrics.RealizationMetrics(LAYOUT3, + "Agg Index", "error", Lists.newArrayList()); + queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(realizationMetric)); + queryHistory2.setQueryHistoryInfo(queryHistoryInfo2); + queryHistory2.setId(11); + return Lists.newArrayList(queryHistory1, queryHistory2); + } + int startOffset = 0; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java index c72b4842e5..7a5d81b145 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java @@ -242,6 +242,11 @@ public class NDataflow extends RootPersistentEntity implements Serializable, IRe return model == null ? null : model.getAlias(); } + public String getFusionModelAlias() { + NDataModel model = getModel(); + return model == null ? null : model.getFusionModelAlias(); + } + @Override public Set<TblColRef> getAllColumns() { return getIndexPlan().listAllTblColRefs(); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java index d1ce44915d..28719ab1e1 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java @@ -120,7 +120,7 @@ public class FusionModel extends RootPersistentEntity implements Serializable { NDataModelManager modelManager = NDataModelManager.getInstance(config, project); for (String modelId : getModelsId()) { NDataModel model = modelManager.getDataModelDesc(modelId); - if (model.getModelType() == modelType) { + if (model != null && model.getModelType() == modelType) { return model; } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java index f0623f5fa2..45fdf65aae 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java @@ -18,9 +18,14 @@ package org.apache.kylin.metadata.model; +import java.util.Objects; +import java.util.Optional; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.cachesync.CachedCrudAssist; +import org.apache.kylin.metadata.query.NativeQueryRealization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,4 +98,14 @@ public class FusionModelManager { return crud.save(desc); } + + public String getModelId(NativeQueryRealization realization) { + String modelId = realization.getModelId(); + FusionModel fusionModel = getFusionModel(modelId); + if (!realization.isStreamingLayout() && !Objects.isNull(fusionModel)) { + NDataModel dataModel = fusionModel.getBatchModel(); + modelId = Optional.ofNullable(dataModel).map(RootPersistentEntity::getId).orElse(""); + } + return modelId; + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java index c24f1ae908..4414ff2fb4 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.List; import java.util.regex.Pattern; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.metadata.model.FusionModelManager; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; @@ -31,7 +33,7 @@ import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; import lombok.val; -import org.apache.kylin.common.util.JsonUtil; +import lombok.extern.slf4j.Slf4j; @SuppressWarnings("serial") @Getter @@ -192,7 +194,7 @@ public class QueryHistory { return queryStatus.equals(QUERY_HISTORY_FAILED); } - public List<NativeQueryRealization> transformRealizations() { + public List<NativeQueryRealization> transformRealizations(String project) { List<NativeQueryRealization> realizations = Lists.newArrayList(); if (queryHistoryInfo == null || queryHistoryInfo.getRealizationMetrics() == null || queryHistoryInfo.getRealizationMetrics().isEmpty()) { @@ -200,7 +202,7 @@ public class QueryHistory { } List<QueryMetrics.RealizationMetrics> realizationMetrics = queryHistoryInfo.realizationMetrics; - + val fusionModelManager = FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project); for (QueryMetrics.RealizationMetrics metrics : realizationMetrics) { val realization = new NativeQueryRealization(metrics.modelId, metrics.layoutId == null || metrics.layoutId.equals("null") ? null @@ -210,6 +212,8 @@ public class QueryHistory { : metrics.snapshots); realization.setSecondStorage(metrics.isSecondStorage); realization.setStreamingLayout(metrics.isStreamingLayout); + String modelId = fusionModelManager.getModelId(realization); + realization.setModelId(modelId); realizations.add(realization); } return realizations; diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java index b83682d724..67796a1eb5 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java @@ -989,4 +989,24 @@ public class NDataflowManagerTest extends NLocalFileMetadataTestCase { Assert.assertEquals(5, flatTableDesc2.getUsedColumns().size()); } + @Test + public void testGetFusionModelAlias() { + String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4"; + String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c"; + NDataflowManager mgr = NDataflowManager.getInstance(getTestConfig(), "streaming_test"); + Assert.assertEquals("fusion_model", mgr.getDataflow(streamingModelId).getFusionModelAlias()); + Assert.assertEquals("fusion_model", mgr.getDataflow(batchModelId).getFusionModelAlias()); + + Assert.assertEquals("stream_merge", + mgr.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72").getFusionModelAlias()); + + mgr = NDataflowManager.getInstance(getTestConfig(), projectDefault); + Assert.assertEquals("nmodel_basic_inner", + mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias()); + + val modelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), projectDefault); + modelManager.dropModel("741ca86a-1f13-46da-a59f-95fb68615e3a"); + Assert.assertNull(mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias()); + + } } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java index 54866b4fdc..c44fdec332 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java @@ -22,6 +22,7 @@ import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.model.FusionModel; import org.apache.kylin.metadata.model.FusionModelManager; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.query.NativeQueryRealization; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -106,4 +107,27 @@ public class FusionModelManagerTest extends NLocalFileMetadataTestCase { Assert.assertTrue(streamingModel.fusionModelStreamingPart()); } + @Test + public void testGetModelId() { + String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4"; + String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c"; + + val realization = new NativeQueryRealization(); + realization.setModelId(streamingModelId); + Assert.assertEquals(batchModelId, mgr.getModelId(realization)); + + realization.setModelId(streamingModelId); + realization.setStreamingLayout(true); + Assert.assertEquals(streamingModelId, mgr.getModelId(realization)); + + realization.setModelId(batchModelId); + realization.setStreamingLayout(false); + Assert.assertEquals(batchModelId, mgr.getModelId(realization)); + + realization.setModelId(streamingModelId); + val modelMgr = NDataModelManager.getInstance(getTestConfig(), PROJECT); + modelMgr.dropModel(batchModelId); + Assert.assertEquals("", mgr.getModelId(realization)); + } + } diff --git a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java index 1dcad68b23..1b26150623 100644 --- a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java +++ b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java @@ -128,7 +128,7 @@ public class AsyncTaskService implements AsyncTaskServiceSupporter { val noBrokenModels = NDataflowManager.getInstance(kylinConfig, project).listUnderliningDataModels().stream() .collect(Collectors.toMap(NDataModel::getAlias, RootPersistentEntity::getUuid)); val dataModelManager = NDataModelManager.getInstance(kylinConfig, project); - List<NativeQueryRealization> realizations = qh.transformRealizations(); + List<NativeQueryRealization> realizations = qh.transformRealizations(project); realizations.forEach(realization -> { NDataModel nDataModel = dataModelManager.getDataModelDesc(realization.getModelId()); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java index bc38e2c99a..6f7ecff94b 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java @@ -195,7 +195,7 @@ public class QueryCacheManager implements CommonQueryCacheSupporter { val modelId = nativeQueryRealization.getModelId(); val dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project) .getDataflow(modelId); - nativeQueryRealization.setModelAlias(dataflow.getModelAlias()); + nativeQueryRealization.setModelAlias(dataflow.getFusionModelAlias()); } return cached; diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java index f7c3bfc747..7479631821 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java @@ -40,7 +40,6 @@ import java.util.stream.Collectors; import javax.servlet.http.HttpServletResponse; -import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.apache.kylin.common.KylinConfig; @@ -48,9 +47,6 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.TimeUtil; -import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.rest.exception.ForbiddenException; -import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.common.util.Unsafe; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; @@ -59,6 +55,7 @@ import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.query.NativeQueryRealization; import org.apache.kylin.metadata.query.QueryHistory; import org.apache.kylin.metadata.query.QueryHistoryDAO; @@ -66,8 +63,10 @@ import org.apache.kylin.metadata.query.QueryHistoryInfo; import org.apache.kylin.metadata.query.QueryHistoryRequest; import org.apache.kylin.metadata.query.QueryStatistics; import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO; +import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.response.NDataModelResponse; import org.apache.kylin.rest.response.QueryStatisticsResponse; +import org.apache.kylin.rest.util.AclEvaluate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -79,6 +78,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap; import lombok.val; @Component("queryHistoryService") @@ -191,13 +191,16 @@ public class QueryHistoryService extends BasicService implements AsyncTaskQueryH .listUnderliningDataModels().stream() .collect(Collectors.toMap(NDataModel::getAlias, RootPersistentEntity::getUuid)); - val dataModelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - val indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - List<NativeQueryRealization> realizations = query.transformRealizations(); + val config = KylinConfig.getInstanceFromEnv(); + val indexPlanManager = NIndexPlanManager.getInstance(config, project); + val modelManager = NDataModelManager.getInstance(config, project); + + List<NativeQueryRealization> realizations = query.transformRealizations(project); realizations.forEach(realization -> { - NDataModel nDataModel = dataModelManager.getDataModelDesc(realization.getModelId()); - if (noBrokenModels.containsValue(realization.getModelId())) { + String modelId = realization.getModelId(); + NDataModel nDataModel = modelManager.getDataModelDesc(modelId); + if (noBrokenModels.containsValue(modelId)) { NDataModelResponse model = (NDataModelResponse) modelService .updateResponseAcl(new NDataModelResponse(nDataModel), project); realization.setModelAlias(model.getFusionModelAlias()); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 69d75265df..b1361132f9 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -83,6 +83,7 @@ import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.acl.AclTCR; import org.apache.kylin.metadata.acl.AclTCRManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; +import org.apache.kylin.metadata.model.FusionModelManager; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.NDataModel; @@ -636,6 +637,12 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup QueryContext.currentMetrics().getTotalScanRows()); } + val fusionManager = FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(), + sqlRequest.getProject()); + if (CollectionUtils.isNotEmpty(sqlResponse.getNativeRealizations())) { + sqlResponse.getNativeRealizations().stream() + .forEach(realization -> realization.setModelId(fusionManager.getModelId(realization))); + } //check query result row count NCircuitBreaker.verifyQueryResultRowCount(sqlResponse.getResultRowCount()); diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java index da763292c8..a53f9af8e7 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java @@ -902,12 +902,17 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase { QueryHistory query2 = new QueryHistory(); query2.setSql("select * from test_table_2"); + QueryHistory query3 = new QueryHistory(); + query3.setSql("select * from test_table_3"); + QueryMetrics.RealizationMetrics metrics1 = new QueryMetrics.RealizationMetrics("1", "Agg Index", "b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new String[] {})); QueryMetrics.RealizationMetrics metrics2 = new QueryMetrics.RealizationMetrics("1", "Agg Index", "334671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new String[] {})); QueryMetrics.RealizationMetrics metrics3 = new QueryMetrics.RealizationMetrics("1", "Agg Index", "554671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new String[] {})); + QueryMetrics.RealizationMetrics metrics4 = new QueryMetrics.RealizationMetrics("1", "Agg Index", + "b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new String[] {})); QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo(); queryHistoryInfo1.setRealizationMetrics( @@ -917,8 +922,12 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase { QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo(); queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(new QueryMetrics.RealizationMetrics[] { metrics3 })); query2.setQueryHistoryInfo(queryHistoryInfo2); + + QueryHistoryInfo queryHistoryInfo3 = new QueryHistoryInfo(); + queryHistoryInfo3.setRealizationMetrics(Lists.newArrayList(new QueryMetrics.RealizationMetrics[] { metrics4 })); + query3.setQueryHistoryInfo(queryHistoryInfo3); RDBMSQueryHistoryDAO queryHistoryDAO = Mockito.mock(RDBMSQueryHistoryDAO.class); - Mockito.doReturn(Lists.newArrayList(query1, query2)).when(queryHistoryDAO) + Mockito.doReturn(Lists.newArrayList(query1, query2, query3)).when(queryHistoryDAO) .getQueryHistoriesByConditions(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()); Mockito.doReturn(10L).when(queryHistoryDAO).getQueryHistoriesSize(Mockito.any(), Mockito.anyString()); Mockito.doReturn(queryHistoryDAO).when(queryHistoryService).getQueryHistoryDao(); @@ -930,6 +939,8 @@ public class QueryHistoryServiceTest extends NLocalFileMetadataTestCase { Assert.assertEquals("streaming_test", queryHistories.get(0).getNativeQueryRealizations().get(1).getModelAlias()); Assert.assertEquals("batch", queryHistories.get(1).getNativeQueryRealizations().get(0).getModelAlias()); + Assert.assertEquals("334671fd-e383-4fc9-b5c2-94fce832f77a", + queryHistories.get(2).getNativeQueryRealizations().get(0).getModelId()); } @Test diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 3a6f6670da..fec084ee69 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -586,6 +586,39 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { mockQueryWithSqlMassage(); } + private void mockOLAPContextWithBatchPart() throws Exception { + val modelManager = Mockito + .spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), "streaming_test")); + + Mockito.doReturn(modelManager).when(queryService).getManager(NDataModelManager.class, "streaming_test"); + // mock agg index realization + OLAPContext aggMock = new OLAPContext(1); + NDataModel mockModel1 = Mockito.spy(new NDataModel()); + Mockito.when(mockModel1.getUuid()).thenReturn("4965c827-fbb4-4ea1-a744-3f341a3b030d"); + Mockito.when(mockModel1.getAlias()).thenReturn("model_streaming"); + Mockito.doReturn(mockModel1).when(modelManager).getDataModelDesc("4965c827-fbb4-4ea1-a744-3f341a3b030d"); + + IRealization batchRealization = Mockito.mock(IRealization.class); + Mockito.when(batchRealization.getUuid()).thenReturn("cd2b9a23-699c-4699-b0dd-38c9412b3dfd"); + + HybridRealization hybridRealization = Mockito.mock(HybridRealization.class); + Mockito.when(hybridRealization.getModel()).thenReturn(mockModel1); + Mockito.when(hybridRealization.getBatchRealization()).thenReturn(batchRealization); + + aggMock.realization = hybridRealization; + IndexEntity mockIndexEntity1 = new IndexEntity(); + mockIndexEntity1.setId(1); + LayoutEntity mockLayout1 = new LayoutEntity(); + mockLayout1.setIndex(mockIndexEntity1); + aggMock.storageContext.setCandidate(new NLayoutCandidate(mockLayout1)); + aggMock.storageContext.setLayoutId(20001L); + aggMock.storageContext.setPrunedSegments(Lists.newArrayList(new NDataSegment())); + OLAPContext.registerContext(aggMock); + + Mockito.doNothing().when(queryService).clearThreadLocalContexts(); + mockQueryWithSqlMassage(); + } + private void mockOLAPContextWithStreaming() throws Exception { val modelManager = Mockito.spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), "demo")); @@ -1991,28 +2024,48 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { @Test public void testQueryContextWithFusionModel() throws Exception { final String project = "streaming_test"; - final String sql = "select count(*) from SSB_STREAMING"; - stubQueryConnection(sql, project); - mockOLAPContextWithHybrid(); + { + final String sql = "select count(*) from SSB_STREAMING"; - final SQLRequest request = new SQLRequest(); - request.setProject(project); - request.setSql(sql); - Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService); - SQLResponse sqlResponse = queryService.doQueryWithCache(request); + stubQueryConnection(sql, project); + mockOLAPContextWithHybrid(); - Assert.assertEquals(2, sqlResponse.getNativeRealizations().size()); + final SQLRequest request = new SQLRequest(); + request.setProject(project); + request.setSql(sql); + Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService); + SQLResponse sqlResponse = queryService.doQueryWithCache(request); - Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d", - sqlResponse.getNativeRealizations().get(0).getModelId()); - Assert.assertEquals((Long) 10001L, sqlResponse.getNativeRealizations().get(0).getLayoutId()); - Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd", - sqlResponse.getNativeRealizations().get(1).getModelId()); - Assert.assertEquals((Long) 20001L, sqlResponse.getNativeRealizations().get(1).getLayoutId()); + Assert.assertEquals(2, sqlResponse.getNativeRealizations().size()); - Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout()); - Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout()); + Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d", + sqlResponse.getNativeRealizations().get(0).getModelId()); + Assert.assertEquals((Long) 10001L, sqlResponse.getNativeRealizations().get(0).getLayoutId()); + Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd", + sqlResponse.getNativeRealizations().get(1).getModelId()); + Assert.assertEquals((Long) 20001L, sqlResponse.getNativeRealizations().get(1).getLayoutId()); + + Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout()); + Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout()); + } + { + final String sql = "select count(1) from SSB_STREAMING"; + + stubQueryConnection(sql, project); + mockOLAPContextWithBatchPart(); + + final SQLRequest request = new SQLRequest(); + request.setProject(project); + request.setSql(sql); + Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService); + SQLResponse sqlResponse = queryService.doQueryWithCache(request); + + Assert.assertEquals(1, sqlResponse.getNativeRealizations().size()); + Assert.assertFalse(sqlResponse.getNativeRealizations().get(0).isStreamingLayout()); + Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd", + sqlResponse.getNativeRealizations().get(0).getModelId()); + } } @Test