This is an automated email from the ASF dual-hosted git repository.
pfzhan pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 103a087051 KYLIN-6028 fix parallel build/refresh job
103a087051 is described below
commit 103a087051ca8259f99ed7b13b909b9d48879cae
Author: 夏旭晨 <[email protected]>
AuthorDate: Tue Nov 19 19:09:23 2024 +0800
KYLIN-6028 fix parallel build/refresh job
Co-authored-by: xuchen.xia <[email protected]>
---
.../common/persistence/metadata/AuditLogStore.java | 20 +++++
.../metadata/FileSystemMetadataStore.java | 14 +--
.../metadata/cube/model/NDataSegmentManager.java | 21 ++++-
.../apache/kylin/job/service/JobInfoService.java | 6 +-
.../kylin/rest/service/ModelServiceBuildTest.java | 99 ++++++++++++++++++++++
.../kylin/rest/service/ModelServiceTest.java | 10 ++-
6 files changed, 158 insertions(+), 12 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
index 7273a1e05c..361b27d973 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/AuditLogStore.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.persistence.AuditLog;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.UnitMessages;
@@ -54,6 +55,25 @@ public interface AuditLogStore extends Closeable {
getReplayWorker().waitForCatchup(getMaxId(),
getConfig().getCatchUpTimeout());
}
+ // If the current thread is in a transaction, the latest auditlogId cannot
be obtained during
+ // catchup because the transaction level is repeatable.
+ // So, let's do the catchup in a new thread.
+ default void catchupWithTimeoutInNewThread() {
+ Thread catchupThread = new Thread(() -> {
+ try {
+ this.catchupWithTimeout();
+ } catch (TimeoutException e) {
+ throw new KylinRuntimeException(e);
+ }
+ });
+ catchupThread.start();
+ try {
+ catchupThread.join();
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
default void catchupWithMaxTimeout() throws TimeoutException {
val store = ResourceStore.getKylinMetaStore(getConfig());
getReplayWorker().catchupFrom(store.getOffset());
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
index aa6074966d..430c1b45c7 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileSystemMetadataStore.java
@@ -229,6 +229,13 @@ public class FileSystemMetadataStore extends MetadataStore
{
List<T> resList;
val context = convertConditionsToFilter(filter, type);
Class<T> resourceClass = (Class<T>) type.getResourceClass();
+ if (needLock) {
+ if (context.isWholePath()) {
+ lockResource(context.getResPath());
+ } else {
+ lockResource(type.name());
+ }
+ }
try {
if (this.type == Type.DIR) {
resList = getFromDir(context, needContent, resourceClass);
@@ -238,13 +245,6 @@ public class FileSystemMetadataStore extends MetadataStore
{
} catch (IOException e) {
throw new KylinRuntimeException("get resource fail", e);
}
- if (needLock) {
- if (context.isWholePath()) {
- lockResource(context.getResPath());
- } else {
- lockResource(type.name());
- }
- }
return resList;
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
index 51ed396d8c..ffb0941c59 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegmentManager.java
@@ -22,13 +22,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinRuntimeException;
import org.apache.kylin.common.persistence.MetadataType;
-import org.apache.kylin.common.persistence.RawResourceFilter;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.metadata.Manager;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.model.Segments;
@@ -115,8 +115,25 @@ public class NDataSegmentManager extends
Manager<NDataSegment> {
}
public Segments<NDataSegment> getSegments(NDataflow df, Collection<String>
segIds) {
+ return getSegmentsWithForceCatchupIfInconsistent(df, segIds, true);
+ }
+
+ private Segments<NDataSegment>
getSegmentsWithForceCatchupIfInconsistent(NDataflow df, Collection<String>
segIds,
+ boolean needCatch) {
List<NDataSegment> segments = segIds.stream().map(uuid ->
getWithoutInitDataflow(uuid).orElse(null))
.filter(Objects::nonNull).collect(Collectors.toList());
+ if (segments.size() < segIds.size()) {
+ logger.warn("Segments are inconsistency with dataflow, force to
catchup: {}", needCatch);
+ if (needCatch) {
+ try {
+
ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).getAuditLogStore()
+ .catchupWithTimeoutInNewThread();
+ } catch (KylinRuntimeException e) {
+ logger.warn("Manually catchup auditlog Failed.", e);
+ }
+ return getSegmentsWithForceCatchupIfInconsistent(df, segIds,
false);
+ }
+ }
segments.forEach(seg -> seg.setDataflow(df));
return new Segments<>(segments);
}
diff --git
a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
index b1db21e671..2cd61ccf52 100644
---
a/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
+++
b/src/data-loading-service/src/main/java/org/apache/kylin/job/service/JobInfoService.java
@@ -96,6 +96,7 @@ import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.constant.Constant;
@@ -619,7 +620,10 @@ public class JobInfoService extends BasicService
implements JobSupporter {
return;
}
killExistApplication(job);
- getManager(ExecutableManager.class, project).discardJob(job.getId());
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ getManager(ExecutableManager.class,
project).discardJob(job.getId());
+ return true;
+ }, project);
if (getConfig().isMailEnabled()) {
job.notifyUser(MailNotificationType.JOB_DISCARDED);
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 31d1dd9500..d43e96c4ad 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -30,6 +30,7 @@ import static
org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_STA
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -96,6 +97,7 @@ import
org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.rest.config.initialize.ModelBrokenListener;
+import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.ModelRequest;
import org.apache.kylin.rest.request.PartitionsRefreshRequest;
import org.apache.kylin.rest.request.SegmentTimeRequest;
@@ -120,6 +122,9 @@ import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.test.util.ReflectionTestUtils;
import lombok.val;
@@ -1066,6 +1071,100 @@ public class ModelServiceBuildTest extends
SourceTestCase {
Assert.assertEquals(3,
ExecutableParams.getBuckets(job.getParam("buckets")).size());
}
+ @Test
+ public void testParallelSubmitBuildJob() throws Exception {
+ val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+ val project = "default";
+ NDataflowManager dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ ExecutableManager executableManager =
ExecutableManager.getInstance(getTestConfig(), project);
+ doParallelSubmitBuildJob("1633017600000", "1633104000000",
"1633190400000");
+
+ Assert.assertEquals(2,
dataflowManager.getDataflow(modelId).getSegments().size());
+ Assert.assertEquals(2, executableManager.getAllExecutables().size());
+ }
+
+ @Test
+ public void testParallelSubmitBuildJobWithOverlap() throws Exception {
+ val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+ val project = "default";
+ NDataflowManager dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ ExecutableManager executableManager =
ExecutableManager.getInstance(getTestConfig(), project);
+ doParallelSubmitBuildJob("1633017600000", "1633104000000", null);
+
+ Assert.assertEquals(1,
dataflowManager.getDataflow(modelId).getSegments().size());
+ Assert.assertEquals(1, executableManager.getAllExecutables().size());
+ }
+
+ private void doParallelSubmitBuildJob(String start, String end, String
end2) throws Exception {
+ val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
+ val project = "default";
+
+ NDataflowManager dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ NDataflow dataflow = dataflowManager.getDataflow(modelId);
+ val model = dataflow.getModel();
+ indexDataConstructor.cleanSegments(modelId);
+
+ Thread t1 = new Thread(getSubmitJob(project, model, start, end));
+ Thread t2 = new Thread(getSubmitJob(project, model, end2 == null ?
start : end, end2 == null ? end : end2));
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ }
+
+ private Runnable getSubmitJob(String project, NDataModel model, String
start, String end) {
+ return () -> {
+ try {
+ Authentication authentication = new
TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+
SecurityContextHolder.getContext().setAuthentication(authentication);
+ modelBuildService.incrementBuildSegmentsManually(new
IncrementBuildSegmentParams(project,
+ model.getUuid(), start, end, model.getPartitionDesc(),
null, null, true, null));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ @Test
+ public void testParallelSubmitRefreshJob() throws InterruptedException {
+ String project = getProject();
+ val modelId = "741ca86a-1f13-46da-a59f-95fb68615e3a";
+ ExecutableManager executableManager =
ExecutableManager.getInstance(getTestConfig(), project);
+ NDataflowManager dataflowManager =
NDataflowManager.getInstance(getTestConfig(), project);
+ val df = dataflowManager.getDataflow(modelId);
+ // remove the existed seg
+ indexDataConstructor.cleanSegments(modelId);
+
+ long start = SegmentRange.dateToLong("2010-01-02");
+ long end = SegmentRange.dateToLong("2010-01-03");
+ SegmentRange segmentRange = new
SegmentRange.TimePartitionedSegmentRange(start, end);
+ val dataSegment = indexDataConstructor.addSegment(modelId,
segmentRange, SegmentStatusEnum.READY, null);
+ NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
+ update.setToAddOrUpdateLayouts(
+ generateAllDataLayout(getProject(), modelId,
Collections.singletonList(dataSegment)));
+
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
+ () ->
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).updateDataflow(update),
+ getProject());
+
+ Runnable submitJob = () -> {
+ Authentication authentication = new
TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+
SecurityContextHolder.getContext().setAuthentication(authentication);
+ indexDataConstructor.transactionWrap(project,
+ () -> modelBuildService.refreshSegmentById(new
RefreshSegmentParams(project,
+ "741ca86a-1f13-46da-a59f-95fb68615e3a", new
String[] { dataSegment.getId() })));
+ };
+ Thread t1 = new Thread(submitJob);
+ Thread t2 = new Thread(submitJob);
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+
+ Assert.assertEquals(2,
dataflowManager.getDataflow(modelId).getSegments().size());
+ Assert.assertEquals(1, executableManager.getAllExecutables().size());
+ }
+
@Test
public void testBuildMultiPartitionSegments() throws Exception {
val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
diff --git
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
index 8cf66dcc52..d29e83308a 100644
---
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
+++
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ModelServiceTest.java
@@ -1275,11 +1275,17 @@ public class ModelServiceTest extends SourceTestCase {
@Test
public void testRenameModel() {
- modelService.renameDataModel("default",
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "new_name", "");
+ UnitOfWork.doInTransactionWithRetry(() -> {
+ modelService.renameDataModel("default",
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "new_name", "");
+ return null;
+ }, "test");
List<NDataModelResponse> models = modelService.getModels("new_name",
"default", true, "", null, "last_modify",
true);
Assert.assertEquals("new_name", models.get(0).getAlias());
- modelService.renameDataModel("default",
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "New_Name", "test desc");
+ UnitOfWork.doInTransactionWithRetry(() -> {
+ modelService.renameDataModel("default",
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "New_Name", "test desc");
+ return null;
+ }, "test2");
models = modelService.getModels("new_name", "default", true, "", null,
"last_modify", true);
Assert.assertEquals("new_name", models.get(0).getAlias());
Assert.assertEquals("test desc", models.get(0).getDescription());