This is an automated email from the ASF dual-hosted git repository.

pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 103a087051 KYLIN-6028 fix parallel build/refresh job
103a087051 is described below

commit 103a087051ca8259f99ed7b13b909b9d48879cae
Author: 夏旭晨 <[email protected]>
AuthorDate: Tue Nov 19 19:09:23 2024 +0800

    KYLIN-6028 fix parallel build/refresh job
    
    Co-authored-by: xuchen.xia <[email protected]>
---
 .../common/persistence/metadata/AuditLogStore.java | 20 +++++
 .../metadata/FileSystemMetadataStore.java          | 14 +--
 .../metadata/cube/model/NDataSegmentManager.java   | 21 ++++-
 .../apache/kylin/job/service/JobInfoService.java   |  6 +-
 .../kylin/rest/service/ModelServiceBuildTest.java  | 99 ++++++++++++++++++++++
 .../kylin/rest/service/ModelServiceTest.java       | 10 ++-
 6 files changed, 158 insertions(+), 12 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
index 7273a1e05c..361b27d973 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.persistence.AuditLog;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.UnitMessages;
@@ -54,6 +55,25 @@ public interface AuditLogStore extends Closeable {
         getReplayWorker().waitForCatchup(getMaxId(), 
getConfig().getCatchUpTimeout());
     }
 
+    // If the current thread is in a transaction, the latest auditlogId cannot 
be obtained during
+    // catchup because the transaction level is repeatable.
+    // So, let's do the catchup in a new thread.
+    default void catchupWithTimeoutInNewThread() {
+        Thread catchupThread = new Thread(() -> {
+            try {
+                this.catchupWithTimeout();
+            } catch (TimeoutException e) {
+                throw new KylinRuntimeException(e);
+            }
+        });
+        catchupThread.start();
+        try {
+            catchupThread.join();
+        } catch (InterruptedException ignored) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     default void catchupWithMaxTimeout() throws TimeoutException {
         val store = ResourceStore.getKylinMetaStore(getConfig());
         getReplayWorker().catchupFrom(store.getOffset());
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
index aa6074966d..430c1b45c7 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
@@ -229,6 +229,13 @@ public class FileSystemMetadataStore extends MetadataStore 
{
         List<T> resList;
         val context = convertConditionsToFilter(filter, type);
         Class<T> resourceClass = (Class<T>) type.getResourceClass();
+        if (needLock) {
+            if (context.isWholePath()) {
+                lockResource(context.getResPath());
+            } else {
+                lockResource(type.name());
+            }
+        }
         try {
             if (this.type == Type.DIR) {
                 resList = getFromDir(context, needContent, resourceClass);
@@ -238,13 +245,6 @@ public class FileSystemMetadataStore extends MetadataStore 
{
         } catch (IOException e) {
             throw new KylinRuntimeException("get resource fail", e);
         }
-        if (needLock) {
-            if (context.isWholePath()) {
-                lockResource(context.getResPath());
-            } else {
-                lockResource(type.name());
-            }
-        }
         return resList;
     }
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
index 51ed396d8c..ffb0941c59 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
@@ -22,13 +22,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
 import org.apache.kylin.common.persistence.MetadataType;
-import org.apache.kylin.common.persistence.RawResourceFilter;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.metadata.Manager;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.model.Segments;
@@ -115,8 +115,25 @@ public class NDataSegmentManager extends 
Manager<NDataSegment> {
     }
 
     public Segments<NDataSegment> getSegments(NDataflow df, Collection<String> 
segIds) {
+        return getSegmentsWithForceCatchupIfInconsistent(df, segIds, true);
+    }
+
+    private Segments<NDataSegment> 
getSegmentsWithForceCatchupIfInconsistent(NDataflow df, Collection<String> 
segIds,
+            boolean needCatch) {
         List<NDataSegment> segments = segIds.stream().map(uuid -> 
getWithoutInitDataflow(uuid).orElse(null))
                 .filter(Objects::nonNull).collect(Collectors.toList());
+        if (segments.size() < segIds.size()) {
+            logger.warn("Segments are inconsistency with dataflow, force to 
catchup: {}", needCatch);
+            if (needCatch) {
+                try {
+                    
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).getAuditLogStore()
+                            .catchupWithTimeoutInNewThread();
+                } catch (KylinRuntimeException e) {
+                    logger.warn("Manually catchup auditlog Failed.", e);
+                }
+                return getSegmentsWithForceCatchupIfInconsistent(df, segIds, 
false);
+            }
+        }
         segments.forEach(seg -> seg.setDataflow(df));
         return new Segments<>(segments);
     }
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
index b1db21e671..2cd61ccf52 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
@@ -96,6 +96,7 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.constant.Constant;
@@ -619,7 +620,10 @@ public class JobInfoService extends BasicService 
implements JobSupporter {
             return;
         }
         killExistApplication(job);
-        getManager(ExecutableManager.class, project).discardJob(job.getId());
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            getManager(ExecutableManager.class, 
project).discardJob(job.getId());
+            return true;
+        }, project);
 
         if (getConfig().isMailEnabled()) {
             job.notifyUser(MailNotificationType.JOB_DISCARDED);
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 31d1dd9500..d43e96c4ad 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_STA
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -96,6 +97,7 @@ import 
org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
+import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.ModelRequest;
 import org.apache.kylin.rest.request.PartitionsRefreshRequest;
 import org.apache.kylin.rest.request.SegmentTimeRequest;
@@ -120,6 +122,9 @@ import org.junit.Test;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import lombok.val;
@@ -1066,6 +1071,100 @@ public class ModelServiceBuildTest extends 
SourceTestCase {
         Assert.assertEquals(3, 
ExecutableParams.getBuckets(job.getParam("buckets")).size());
     }
 
+    @Test
+    public void testParallelSubmitBuildJob() throws Exception {
+        val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+        val project = "default";
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        ExecutableManager executableManager = 
ExecutableManager.getInstance(getTestConfig(), project);
+        doParallelSubmitBuildJob("1633017600000", "1633104000000", 
"1633190400000");
+
+        Assert.assertEquals(2, 
dataflowManager.getDataflow(modelId).getSegments().size());
+        Assert.assertEquals(2, executableManager.getAllExecutables().size());
+    }
+
+    @Test
+    public void testParallelSubmitBuildJobWithOverlap() throws Exception {
+        val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+        val project = "default";
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        ExecutableManager executableManager = 
ExecutableManager.getInstance(getTestConfig(), project);
+        doParallelSubmitBuildJob("1633017600000", "1633104000000", null);
+
+        Assert.assertEquals(1, 
dataflowManager.getDataflow(modelId).getSegments().size());
+        Assert.assertEquals(1, executableManager.getAllExecutables().size());
+    }
+
+    private void doParallelSubmitBuildJob(String start, String end, String 
end2) throws Exception {
+        val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+        val project = "default";
+
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        NDataflow dataflow = dataflowManager.getDataflow(modelId);
+        val model = dataflow.getModel();
+        indexDataConstructor.cleanSegments(modelId);
+
+        Thread t1 = new Thread(getSubmitJob(project, model, start, end));
+        Thread t2 = new Thread(getSubmitJob(project, model, end2 == null ? 
start : end, end2 == null ? end : end2));
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+    }
+
+    private Runnable getSubmitJob(String project, NDataModel model, String 
start, String end) {
+        return () -> {
+            try {
+                Authentication authentication = new 
TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+                
SecurityContextHolder.getContext().setAuthentication(authentication);
+                modelBuildService.incrementBuildSegmentsManually(new 
IncrementBuildSegmentParams(project,
+                        model.getUuid(), start, end, model.getPartitionDesc(), 
null, null, true, null));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testParallelSubmitRefreshJob() throws InterruptedException {
+        String project = getProject();
+        val modelId = "741ca86a-1f13-46da-a59f-95fb68615e3a";
+        ExecutableManager executableManager = 
ExecutableManager.getInstance(getTestConfig(), project);
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(getTestConfig(), project);
+        val df = dataflowManager.getDataflow(modelId);
+        // remove the existed seg
+        indexDataConstructor.cleanSegments(modelId);
+
+        long start = SegmentRange.dateToLong("2010-01-02");
+        long end = SegmentRange.dateToLong("2010-01-03");
+        SegmentRange segmentRange = new 
SegmentRange.TimePartitionedSegmentRange(start, end);
+        val dataSegment = indexDataConstructor.addSegment(modelId, 
segmentRange, SegmentStatusEnum.READY, null);
+        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
+        update.setToAddOrUpdateLayouts(
+                generateAllDataLayout(getProject(), modelId, 
Collections.singletonList(dataSegment)));
+
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
+                () -> 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).updateDataflow(update),
+                getProject());
+
+        Runnable submitJob = () -> {
+            Authentication authentication = new 
TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+            
SecurityContextHolder.getContext().setAuthentication(authentication);
+            indexDataConstructor.transactionWrap(project,
+                    () -> modelBuildService.refreshSegmentById(new 
RefreshSegmentParams(project,
+                            "741ca86a-1f13-46da-a59f-95fb68615e3a", new 
String[] { dataSegment.getId() })));
+        };
+        Thread t1 = new Thread(submitJob);
+        Thread t2 = new Thread(submitJob);
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+
+        Assert.assertEquals(2, 
dataflowManager.getDataflow(modelId).getSegments().size());
+        Assert.assertEquals(1, executableManager.getAllExecutables().size());
+    }
+
     @Test
     public void testBuildMultiPartitionSegments() throws Exception {
         val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 8cf66dcc52..d29e83308a 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -1275,11 +1275,17 @@ public class ModelServiceTest extends SourceTestCase {
 
     @Test
     public void testRenameModel() {
-        modelService.renameDataModel("default", 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "new_name", "");
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            modelService.renameDataModel("default", 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "new_name", "");
+            return null;
+        }, "test");
         List<NDataModelResponse> models = modelService.getModels("new_name", 
"default", true, "", null, "last_modify",
                 true);
         Assert.assertEquals("new_name", models.get(0).getAlias());
-        modelService.renameDataModel("default", 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "New_Name", "test desc");
+        UnitOfWork.doInTransactionWithRetry(() -> {
+            modelService.renameDataModel("default", 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "New_Name", "test desc");
+            return null;
+        }, "test2");
         models = modelService.getModels("new_name", "default", true, "", null, 
"last_modify", true);
         Assert.assertEquals("new_name", models.get(0).getAlias());
         Assert.assertEquals("test desc", models.get(0).getDescription());

Reply via email to