This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 294895e4eec2a99dd22635bf41e51fac7d509465 Author: Pengfei Zhan <pengfei.z...@kyligence.io> AuthorDate: Tue Sep 27 21:29:12 2022 +0800 KYLIN-5320 check and update dataflow lastQueryTime --- .../service/task/QueryHistoryTaskScheduler.java | 19 +++++---- .../kylin/rest/service/UserAclServiceTest.java | 7 ++-- .../task/QueryHistoryTaskSchedulerRunnerTest.java | 34 +++++++-------- .../task/QueryHistoryTaskSchedulerTest.java | 48 ++++++++++++++++++---- 4 files changed, 71 insertions(+), 37 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java index 21764634a9..1d3664e4ec 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java @@ -36,6 +36,13 @@ import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.optimization.FrequencyMap; +import org.apache.kylin.metadata.epoch.EpochManager; +import org.apache.kylin.metadata.favorite.AbstractAsyncTask; +import org.apache.kylin.metadata.favorite.AccelerateRuleUtil; +import org.apache.kylin.metadata.favorite.AsyncAccelerationTask; +import org.apache.kylin.metadata.favorite.AsyncTaskManager; +import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset; +import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; @@ -53,13 +60,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.kylin.metadata.epoch.EpochManager; -import org.apache.kylin.metadata.favorite.AbstractAsyncTask; -import org.apache.kylin.metadata.favorite.AccelerateRuleUtil; -import org.apache.kylin.metadata.favorite.AsyncAccelerationTask; -import org.apache.kylin.metadata.favorite.AsyncTaskManager; -import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset; -import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager; import lombok.Data; import lombok.Getter; import lombok.val; @@ -254,7 +254,7 @@ public class QueryHistoryTaskScheduler { } val snapshotsInRealization = queryHistory.getQueryHistoryInfo().getQuerySnapshots(); for (val snapshots : snapshotsInRealization) { - snapshots.stream().forEach(tableIdentify -> { + snapshots.forEach(tableIdentify -> { results.merge(tableManager.getOrCreateTableExt(tableIdentify), 1, Integer::sum); }); } @@ -305,6 +305,9 @@ public class QueryHistoryTaskScheduler { for (Map.Entry<String, Long> entry : modelsLastQueryTime.entrySet()) { String dataflowId = entry.getKey(); Long lastQueryTime = entry.getValue(); + if (dfManager.getDataflow(dataflowId) == null) { + continue; + } dfManager.updateDataflow(dataflowId, copyForWrite -> copyForWrite.setLastQueryTime(lastQueryTime)); } } diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java index 9ac59fb40c..35c7e70657 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/UserAclServiceTest.java @@ -29,6 +29,8 @@ import java.util.Locale; import org.apache.commons.collections4.CollectionUtils; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.metadata.epoch.EpochManager; +import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.GlobalAccessRequest; import org.apache.kylin.rest.request.GlobalBatchAccessRequest; @@ -38,6 +40,7 @@ import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.SpringContext; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -52,9 +55,6 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.test.util.ReflectionTestUtils; -import org.apache.kylin.metadata.epoch.EpochManager; -import org.apache.kylin.metadata.user.ManagedUser; - public class UserAclServiceTest extends ServiceTestBase { @Mock @@ -120,6 +120,7 @@ public class UserAclServiceTest extends ServiceTestBase { userAclService.grantUserAclPermission("admin", "DATA_QUERY"); } + @Ignore("very unstable") @Test public void testGetAllUsersHasGlobalPermission() { KylinUserService kylinUserService = new KylinUserService() { diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java similarity index 82% rename from src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java rename to src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java index b6fa055350..64a6aa44b9 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerRunnerTest.java @@ -18,13 +18,15 @@ package org.apache.kylin.rest.service.task; +import static org.awaitility.Awaitility.await; + import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.rest.util.SpringContext; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -69,29 +71,20 @@ public class QueryHistoryTaskSchedulerRunnerTest extends NLocalFileMetadataTestC val queryHistoryAccelerateRunnerMock = qhAccelerateScheduler.new QueryHistoryAccelerateRunner(false) { @Override public void work() { - try { - TimeUnit.SECONDS.sleep(mockSleepTimeSecs); + await().pollDelay(mockSleepTimeSecs, TimeUnit.SECONDS).until(() -> { internalExecute.add((System.currentTimeMillis() - startTime) / 1000); //mock exception throw new RuntimeException("test for exception"); - } catch (InterruptedException e) { - log.error("queryHistoryAccelerateRunnerMock is interrupted", e); - } + }); } - }; val queryHistoryMetaUpdateRunnerMock = qhAccelerateScheduler.new QueryHistoryMetaUpdateRunner() { @Override public void work() { - try { - TimeUnit.SECONDS.sleep(mockSleepTimeSecs); - } catch (InterruptedException e) { - log.error("queryHistoryMetaUpdateRunner is interrupted", e); - } + await().pollDelay(mockSleepTimeSecs, TimeUnit.SECONDS); } - }; ReflectionTestUtils.setField(qhAccelerateScheduler, "taskScheduler", Executors.newScheduledThreadPool(1, @@ -101,20 +94,23 @@ public class QueryHistoryTaskSchedulerRunnerTest extends NLocalFileMetadataTestC val schedulerService = (ScheduledExecutorService) ReflectionTestUtils.getField(qhAccelerateScheduler, "taskScheduler"); + Assert.assertNotNull(schedulerService); schedulerService.scheduleWithFixedDelay(queryHistoryAccelerateRunnerMock, 0, mockSchedulerDelay, TimeUnit.SECONDS); schedulerService.scheduleWithFixedDelay(queryHistoryMetaUpdateRunnerMock, 0, mockSchedulerDelay, TimeUnit.SECONDS); val schedulerNum = 10; + await().pollDelay(schedulerNum, TimeUnit.SECONDS).until(() -> { + Assert.assertEquals(internalExecute.size(), schedulerNum / (mockSchedulerDelay + mockSleepTimeSecs)); - TimeUnit.SECONDS.sleep(schedulerNum); - - Assert.assertEquals(internalExecute.size(), schedulerNum / (mockSchedulerDelay + mockSleepTimeSecs)); + for (int i = 0; i < internalExecute.size(); i++) { + Assert.assertEquals(internalExecute.get(i), i * mockSchedulerDelay + mockSleepTimeSecs * (i + 1), + 1); + } + return null; + }); - for (int i = 0; i < internalExecute.size(); i++) { - Assert.assertEquals(internalExecute.get(i), i * mockSchedulerDelay + mockSleepTimeSecs * (i + 1), 1); - } } catch (Exception e) { log.error("test qhAccelerateScheduler error :", e); } finally { diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java similarity index 93% rename from src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java rename to src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java index c026e85c34..dc4dba454e 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java @@ -18,17 +18,15 @@ package org.apache.kylin.rest.service.task; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.List; import java.util.Map; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.TimeUtil; -import org.apache.kylin.metadata.model.TableExtDesc; -import org.apache.kylin.rest.service.IUserGroupService; -import org.apache.kylin.rest.util.SpringContext; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.junit.TimeZoneTestRunner; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; @@ -38,11 +36,15 @@ import org.apache.kylin.metadata.favorite.AsyncTaskManager; import org.apache.kylin.metadata.favorite.QueryHistoryIdOffset; import org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager; import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.query.QueryHistory; import org.apache.kylin.metadata.query.QueryHistoryInfo; import org.apache.kylin.metadata.query.QueryMetrics; import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO; +import org.apache.kylin.rest.service.IUserGroupService; import org.apache.kylin.rest.service.NUserGroupService; +import org.apache.kylin.rest.service.task.QueryHistoryTaskScheduler.QueryHistoryMetaUpdateRunner; +import org.apache.kylin.rest.util.SpringContext; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,9 +62,12 @@ import org.springframework.security.acls.domain.PermissionFactory; import org.springframework.security.acls.model.PermissionGrantingStrategy; import org.springframework.test.util.ReflectionTestUtils; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.val; + @RunWith(PowerMockRunner.class) @PowerMockRunnerDelegate(TimeZoneTestRunner.class) @PrepareForTest({ SpringContext.class, UserGroupInformation.class }) @@ -73,10 +78,11 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { private static final String LAYOUT1 = "20000000001"; private static final String LAYOUT2 = "1000001"; private static final Long QUERY_TIME = 1586760398338L; + + private QueryHistoryTaskScheduler qhAccelerateScheduler; + @Mock private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class); - int startOffset = 0; - private QueryHistoryTaskScheduler qhAccelerateScheduler; @Before public void setUp() throws Exception { @@ -214,6 +220,32 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { Assert.assertEquals(8, idOffsetManager.get().getStatMetaUpdateOffset()); } + @Test + public void testUpdateLastQueryTime() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + + // before update dataflow usage, layout usage and last query time + NDataflow dataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT) + .getDataflow(DATAFLOW); + Assert.assertEquals(3, dataflow.getQueryHitCount()); + Assert.assertNull(dataflow.getLayoutHitCount().get(20000000001L)); + Assert.assertNull(dataflow.getLayoutHitCount().get(1000001L)); + Assert.assertEquals(0L, dataflow.getLastQueryTime()); + + val queryHistoryAccelerateRunner = qhAccelerateScheduler.new QueryHistoryMetaUpdateRunner(); + Class<? extends QueryHistoryMetaUpdateRunner> clazz = queryHistoryAccelerateRunner.getClass(); + Method method = clazz.getDeclaredMethod("updateLastQueryTime", Map.class, String.class); + method.setAccessible(true); + method.invoke(queryHistoryAccelerateRunner, ImmutableMap.of("aaa", 100L), PROJECT); + method.invoke(queryHistoryAccelerateRunner, ImmutableMap.of(DATAFLOW, 100L), PROJECT); + method.setAccessible(false); + + NDataflow dataflow1 = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT) + .getDataflow(DATAFLOW); + long lastQueryTime = dataflow1.getLastQueryTime(); + Assert.assertEquals(100L, lastQueryTime); + } + @Test public void testUpdateMetadataWithStringRealization() { qhAccelerateScheduler.queryHistoryDAO = Mockito.mock(RDBMSQueryHistoryDAO.class); @@ -485,4 +517,6 @@ public class QueryHistoryTaskSchedulerTest extends NLocalFileMetadataTestCase { return histories; } + int startOffset = 0; + }