This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 84f6a0f070 [Improvement-16850][API] Broadcast to cluster when worker
group changed (#16860)
84f6a0f070 is described below
commit 84f6a0f070ca66dd65bab397c17f2a98ed11c84f
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Dec 3 15:36:28 2024 +0800
[Improvement-16850][API] Broadcast to cluster when worker group changed
(#16860)
---
docs/docs/en/guide/upgrade/incompatible.md | 1 +
docs/docs/zh/guide/upgrade/incompatible.md | 1 +
.../api/test/cases/WorkerGroupAPITest.java | 8 +-
.../api/test/pages/security/WorkerGroupPage.java | 4 +-
.../api/controller/WorkerGroupController.java | 23 ++-
.../api/service/WorkerGroupService.java | 5 +-
.../api/service/impl/WorkerGroupServiceImpl.java | 158 +++++++++------------
.../api/service/WorkerGroupServiceTest.java | 127 ++++++++---------
.../dolphinscheduler/dao/entity/WorkerGroup.java | 2 -
.../src/main/resources/sql/dolphinscheduler_h2.sql | 1 -
.../main/resources/sql/dolphinscheduler_mysql.sql | 1 -
.../resources/sql/dolphinscheduler_postgresql.sql | 1 -
.../3.3.0_schema/mysql/dolphinscheduler_ddl.sql | 19 +++
.../postgresql/dolphinscheduler_ddl.sql | 19 +++
.../dao/mapper/WorkerGroupMapperTest.java | 1 -
.../extract/master/IMasterContainerService.java | 29 ++++
.../master/cluster/WorkerGroupChangeNotifier.java | 4 +-
.../server/master/config/MasterConfig.java | 2 +-
.../server/master/rpc/MasterContainerService.java | 39 +++++
.../src/main/resources/application.yaml | 2 +-
20 files changed, 255 insertions(+), 192 deletions(-)
diff --git a/docs/docs/en/guide/upgrade/incompatible.md
b/docs/docs/en/guide/upgrade/incompatible.md
index 12dd7fa356..12152c58a1 100644
--- a/docs/docs/en/guide/upgrade/incompatible.md
+++ b/docs/docs/en/guide/upgrade/incompatible.md
@@ -35,4 +35,5 @@ This document records the incompatible updates between each
version. You need to
* Remove the `Data Quality` module
([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml`
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use
`physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in
master's `application.yaml`, please use `logic-task-config`
([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
+* Drop unused column `other_params_json` in `t_ds_worker_group`
([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
diff --git a/docs/docs/zh/guide/upgrade/incompatible.md
b/docs/docs/zh/guide/upgrade/incompatible.md
index 412a66e4b3..530e006c9c 100644
--- a/docs/docs/zh/guide/upgrade/incompatible.md
+++ b/docs/docs/zh/guide/upgrade/incompatible.md
@@ -33,4 +33,5 @@
* 移除 `数据质量` 模块
([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* 在`application.yaml`中移除`registry-disconnect-strategy`配置
([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
*
在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代
([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
+* 在`t_ds_worker_group` 表中移除 无用的`other_params_json`字段
([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)
diff --git
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java
index bf63d23392..36cb88ccb8 100644
---
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java
+++
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java
@@ -75,8 +75,12 @@ public class WorkerGroupAPITest {
@Test
@Order(1)
public void testSaveWorkerGroup() {
- HttpResponse saveWorkerGroupHttpResponse = workerGroupPage
- .saveWorkerGroup(loginUser, 1, "test_worker_group",
"10.5.0.5:1234", "test", null);
+ HttpResponse saveWorkerGroupHttpResponse =
workerGroupPage.saveWorkerGroup(
+ loginUser,
+ 0,
+ "test_worker_group",
+ "10.5.0.5:1234",
+ "test");
Assertions.assertTrue(saveWorkerGroupHttpResponse.getBody().getSuccess());
HttpResponse queryAllWorkerGroupsResponse =
workerGroupPage.queryAllWorkerGroups(loginUser);
diff --git
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java
index 6ea77b743a..1d2c911969 100644
---
a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java
+++
b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java
@@ -32,15 +32,13 @@ public class WorkerGroupPage {
private String sessionId;
- public HttpResponse saveWorkerGroup(User loginUser, int id, String name,
String addrList, String description,
- String otherParamsJson) {
+ public HttpResponse saveWorkerGroup(User loginUser, int id, String name,
String addrList, String description) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("id", id);
params.put("name", name);
params.put("addrList", addrList);
params.put("description", description);
- params.put("otherParamsJson", otherParamsJson);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
index 8d025bd873..fb3212240a 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
@@ -29,6 +29,7 @@ import
org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Map;
@@ -51,16 +52,13 @@ import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
-/**
- * worker group controller
- */
@Tag(name = "WORKER_GROUP_TAG")
@RestController
@RequestMapping("/worker-groups")
public class WorkerGroupController extends BaseController {
@Autowired
- WorkerGroupService workerGroupService;
+ private WorkerGroupService workerGroupService;
/**
* create or update a worker group
@@ -77,21 +75,18 @@ public class WorkerGroupController extends BaseController {
@Parameter(name = "name", description = "WORKER_GROUP_NAME",
required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "addrList", description = "WORKER_ADDR_LIST",
required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "description", description = "WORKER_DESC",
required = false, schema = @Schema(implementation = String.class)),
- @Parameter(name = "otherParamsJson", description =
"WORKER_PARAMS_JSON", required = false, schema = @Schema(implementation =
String.class)),
})
@PostMapping()
@ResponseStatus(HttpStatus.OK)
@ApiException(SAVE_ERROR)
@OperatorLog(auditType = AuditType.WORKER_GROUP_CREATE)
- public Result saveWorkerGroup(@Parameter(hidden = true)
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
- @RequestParam(value = "id", required =
false, defaultValue = "0") int id,
- @RequestParam(value = "name") String name,
- @RequestParam(value = "addrList") String
addrList,
- @RequestParam(value = "description",
required = false, defaultValue = "") String description,
- @RequestParam(value = "otherParamsJson",
required = false, defaultValue = "") String otherParamsJson) {
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, id, name,
addrList, description, otherParamsJson);
- return returnDataList(result);
+ public Result<WorkerGroup> saveWorkerGroup(@Parameter(hidden = true)
@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+ @RequestParam(value = "id",
required = false, defaultValue = "0") int id,
+ @RequestParam(value = "name")
String name,
+ @RequestParam(value =
"addrList") String addrList,
+ @RequestParam(value =
"description", required = false, defaultValue = "") String description) {
+ final WorkerGroup workerGroup =
workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description);
+ return Result.success(workerGroup);
}
/**
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 9ef215b65d..13cd3c3e35 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import java.util.List;
import java.util.Map;
@@ -33,11 +34,9 @@ public interface WorkerGroupService {
* @param name worker group name
* @param addrList addr list
* @param description description
- * @param otherParamsJson otherParamsJson
* @return create or update result code
*/
- Map<String, Object> saveWorkerGroup(User loginUser, int id, String name,
String addrList, String description,
- String otherParamsJson);
+ WorkerGroup saveWorkerGroup(User loginUser, int id, String name, String
addrList, String description);
/**
* Query worker group paging
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index c55b868797..4173a0c9e4 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -21,6 +21,7 @@ import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
+import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -42,9 +44,10 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
+import org.apache.dolphinscheduler.extract.base.client.Clients;
+import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -55,14 +58,13 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -85,9 +87,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl
implements WorkerGro
@Autowired
private EnvironmentWorkerGroupRelationMapper
environmentWorkerGroupRelationMapper;
- @Autowired
- private ProcessService processService;
-
@Autowired
private ScheduleMapper scheduleMapper;
@@ -107,89 +106,53 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
* @return create or update result code
*/
@Override
- @Transactional
- public Map<String, Object> saveWorkerGroup(User loginUser, int id, String
name, String addrList, String description,
- String otherParamsJson) {
+ public WorkerGroup saveWorkerGroup(User loginUser,
+ int id,
+ String name,
+ String addrList,
+ String description) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser, null,
AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
- putMsg(result, Status.USER_NO_OPERATION_PERM);
- return result;
+ // todo: add permission exception
+ throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if (StringUtils.isEmpty(name)) {
- log.warn("Parameter name can ot be null.");
- putMsg(result, Status.NAME_NULL);
- return result;
+ throw new ServiceException(Status.NAME_NULL);
}
- Date now = new Date();
- WorkerGroup workerGroup = null;
- if (id != 0) {
- workerGroup = workerGroupMapper.selectById(id);
- if (Objects.nonNull(workerGroup) &&
!workerGroup.getName().equals(name)) {
- if (checkWorkerGroupDependencies(workerGroup, result)) {
- return result;
+ checkWorkerGroupAddrList(addrList);
+ final Date now = new Date();
+ final WorkerGroup workerGroup;
+ try {
+ if (id == 0) {
+ // insert
+ workerGroup = new WorkerGroup();
+ workerGroup.setCreateTime(now);
+ workerGroup.setName(name);
+ workerGroup.setAddrList(addrList);
+ workerGroup.setUpdateTime(now);
+ workerGroup.setDescription(description);
+ workerGroupMapper.insert(workerGroup);
+ } else {
+ workerGroup = workerGroupMapper.selectById(id);
+ if (workerGroup == null) {
+ throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST,
id);
}
+ // todo: Can we update the worker name?
+ if (!workerGroup.getName().equals(name)) {
+ checkWorkerGroupDependencies(workerGroup, result);
+ }
+ workerGroup.setName(name);
+ workerGroup.setAddrList(addrList);
+ workerGroup.setUpdateTime(now);
+ workerGroup.setDescription(description);
+ workerGroupMapper.updateById(workerGroup);
+ log.info("Update worker group: {} success .", workerGroup);
}
+ boardCastToMasterThatWorkerGroupChanged();
+ return workerGroup;
+ } catch (DuplicateKeyException duplicateKeyException) {
+ throw new ServiceException(Status.NAME_EXIST, name);
}
- if (workerGroup == null) {
- workerGroup = new WorkerGroup();
- workerGroup.setCreateTime(now);
- }
-
- workerGroup.setName(name);
- workerGroup.setAddrList(addrList);
- workerGroup.setUpdateTime(now);
- workerGroup.setDescription(description);
-
- if (checkWorkerGroupNameExists(workerGroup)) {
- log.warn("Worker group with the same name already exists,
name:{}.", workerGroup.getName());
- putMsg(result, Status.NAME_EXIST, workerGroup.getName());
- return result;
- }
- String invalidAddr = checkWorkerGroupAddrList(workerGroup);
- if (invalidAddr != null) {
- log.warn("Worker group address is invalid, invalidAddr:{}.",
invalidAddr);
- putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
- return result;
- }
-
- handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser,
otherParamsJson);
- log.info("Worker group save complete, workerGroupName:{}.",
workerGroup.getName());
- putMsg(result, Status.SUCCESS);
- result.put(Constants.DATA_LIST, workerGroup);
- return result;
- }
-
- protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper,
WorkerGroup workerGroup, User loginUser,
- String otherParamsJson) {
- if (workerGroup.getId() != null) {
- workerGroupMapper.updateById(workerGroup);
- } else {
- workerGroupMapper.insert(workerGroup);
- }
- }
-
- /**
- * check worker group name exists
- *
- * @param workerGroup worker group
- * @return boolean
- */
- private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
- // check database
- List<WorkerGroup> workerGroupList =
workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
- if (CollectionUtils.isNotEmpty(workerGroupList)) {
- // create group, the same group name exists in the database
- if (workerGroup.getId() == null) {
- return true;
- }
- // update group, the database exists with the same group name
except itself
- Optional<WorkerGroup> sameNameWorkGroupOptional =
workerGroupList.stream()
- .filter(group -> !Objects.equals(group.getId(),
workerGroup.getId())).findFirst();
- if (sameNameWorkGroupOptional.isPresent()) {
- return true;
- }
- }
- return false;
}
/**
@@ -240,23 +203,16 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
return false;
}
- /**
- * check worker group addr list
- *
- * @param workerGroup worker group
- * @return boolean
- */
- private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
- if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
- return null;
+ private void checkWorkerGroupAddrList(String workerGroupAddress) {
+ if (Strings.isNullOrEmpty(workerGroupAddress)) {
+ return;
}
Map<String, String> serverMaps =
registryClient.getServerMaps(RegistryNodeType.WORKER);
- for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
+ for (String addr : workerGroupAddress.split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
- return addr;
+ throw new ServiceException(Status.WORKER_ADDRESS_INVALID);
}
}
- return null;
}
/**
@@ -438,4 +394,20 @@ public class WorkerGroupServiceImpl extends
BaseServiceImpl implements WorkerGro
Schedule::getWorkerGroup));
}
+ private void boardCastToMasterThatWorkerGroupChanged() {
+ final List<Server> masters =
registryClient.getServerList(RegistryNodeType.MASTER);
+ if (CollectionUtils.isEmpty(masters)) {
+ return;
+ }
+ for (Server master : masters) {
+ try {
+ Clients.withService(IMasterContainerService.class)
+ .withHost(master.getHost() + ":" + master.getPort())
+ .refreshWorkerGroup();
+ } catch (Exception e) {
+ log.error("Broadcast to master: {} that worker group changed
failed", master, e);
+ }
+ }
+ }
+
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
index 5cce297d8a..86ea324fd6 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
@@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.api.service;
+import static
org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow;
+import static
org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE;
import static
org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import
org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@@ -40,7 +43,6 @@ import
org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
-import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.HashMap;
@@ -60,6 +62,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.dao.DuplicateKeyException;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@@ -78,9 +81,6 @@ public class WorkerGroupServiceTest {
@Mock
private WorkflowInstanceMapper workflowInstanceMapper;
- @Mock
- private ProcessService processService;
-
@Mock
private RegistryClient registryClient;
@@ -109,85 +109,78 @@ public class WorkerGroupServiceTest {
@Test
public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(false);
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group", "");
- Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
+ assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, () -> {
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group");
+ });
}
@Test
public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, 1, "",
"localhost:0000", "test group", "");
- Assertions.assertEquals(Status.NAME_NULL.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
+ assertThrowsServiceException(Status.NAME_NULL, () -> {
+ workerGroupService.saveWorkerGroup(loginUser, 1, "",
"localhost:0000", "test group");
+ });
}
@Test
public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
- List<WorkerGroup> workerGroupList = new ArrayList<WorkerGroup>();
- workerGroupList.add(getWorkerGroup(1));
-
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList);
-
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group", "");
- Assertions.assertEquals(Status.NAME_EXIST.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
+
+ Map<String, String> serverMaps = new HashMap<>();
+ serverMaps.put("localhost:0000", "");
+
+
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
+
when(workerGroupMapper.insert(Mockito.any())).thenThrow(DuplicateKeyException.class);
+ assertThrowsServiceException(Status.NAME_EXIST, () -> {
+ workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME,
"localhost:0000", "test group");
+ });
}
@Test
public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID()
{
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
-
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+ when(workerGroupMapper.selectById(1)).thenReturn(null);
+
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost1:0000", "");
-
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
-
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group", "");
- Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
+
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
+ assertThrowsServiceException(Status.WORKER_ADDRESS_INVALID, () -> {
+ workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group");
+ });
}
@Test
public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
-
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
+
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
-
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
- Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
-
- Map<String, Object> result =
- workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME,
"localhost:0000", "test group", "");
- Assertions.assertEquals(Status.SUCCESS.getCode(),
- ((Status) result.get(Constants.STATUS)).getCode());
+
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
+ when(workerGroupMapper.insert(any())).thenReturn(1);
+ assertDoesNotThrow(() -> {
+ workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME,
"localhost:0000", "test group");
+ });
}
@Test
@@ -197,13 +190,13 @@ public class WorkerGroupServiceTest {
ids.add(1);
List<WorkerGroup> workerGroups = new ArrayList<>();
workerGroups.add(getWorkerGroup(1));
-
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
+
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), serviceLogger)).thenReturn(ids);
-
Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
+ when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
Set<String> activeWorkerNodes = new HashSet<>();
activeWorkerNodes.add("localhost:12345");
activeWorkerNodes.add("localhost:23456");
-
Mockito.when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes);
+
when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes);
Result result = workerGroupService.queryAllGroupPaging(loginUser, 1,
1, null);
Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());
@@ -219,11 +212,11 @@ public class WorkerGroupServiceTest {
@Test
public void
giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
+ when(workerGroupMapper.selectById(1)).thenReturn(null);
Map<String, Object> notExistResult =
workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
@@ -233,19 +226,19 @@ public class WorkerGroupServiceTest {
@Test
public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+ when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(1);
List<WorkflowInstance> workflowInstances = new
ArrayList<WorkflowInstance>();
workflowInstances.add(workflowInstance);
-
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
+
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus()))
- .thenReturn(workflowInstances);
+ .thenReturn(workflowInstances);
Map<String, Object> deleteFailed =
workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
@@ -255,23 +248,23 @@ public class WorkerGroupServiceTest {
@Test
public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
User loginUser = getLoginUser();
-
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
+
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP,
1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
-
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
+
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP,
null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
- Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
-
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
+ when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
+
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null);
- Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
+ when(workerGroupMapper.deleteById(1)).thenReturn(1);
-
Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
+
when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);
-
Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
+ when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
-
Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
+ when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
Map<String, Object> successResult =
workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
index 3119a8dddd..df9077602e 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
@@ -52,6 +52,4 @@ public class WorkerGroup {
@TableField(exist = false)
private boolean systemDefault;
- private String otherParamsJson;
-
}
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index f4cdb31f03..9beb2ffd9b 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -1015,7 +1015,6 @@ CREATE TABLE t_ds_worker_group
create_time datetime NULL DEFAULT NULL,
update_time datetime NULL DEFAULT NULL,
description text NULL DEFAULT NULL,
- other_params_json text NULL DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY name_unique (name)
);
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 4a63f32e8e..7826be4846 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -1012,7 +1012,6 @@ CREATE TABLE `t_ds_worker_group` (
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
`description` text NULL DEFAULT NULL COMMENT 'description',
- `other_params_json` text NULL DEFAULT NULL COMMENT 'other params json',
PRIMARY KEY (`id`),
UNIQUE KEY `name_unique` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index f2a7aae44b..e31ba9184d 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -926,7 +926,6 @@ CREATE TABLE t_ds_worker_group (
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
description text DEFAULT NULL,
- other_params_json text DEFAULT NULL,
PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name)
) ;
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
index c78c33ea8a..935ece7ca8 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -225,3 +225,22 @@ DROP PROCEDURE drop_data_quality_tables;
ALTER TABLE `t_ds_workflow_definition` ADD KEY `idx_project_code`
(`project_code`) USING BTREE;
ALTER TABLE `t_ds_workflow_definition_log` ADD KEY `idx_project_code`
(`project_code`) USING BTREE;
+
+-- drop_column_t_ds_worker_group other_params_json
+DROP PROCEDURE if EXISTS drop_column_t_ds_worker_group_other_params_json;
+delimiter d//
+CREATE PROCEDURE drop_column_t_ds_worker_group_other_params_json()
+BEGIN
+ IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
+ WHERE TABLE_NAME='t_ds_worker_group'
+ AND TABLE_SCHEMA=(SELECT DATABASE())
+ AND COLUMN_NAME ='other_params_json')
+ THEN
+ALTER TABLE `t_ds_worker_group`
+ DROP COLUMN `other_params_json`;
+END IF;
+END;
+d//
+delimiter ;
+CALL drop_column_t_ds_worker_group_other_params_json;
+DROP PROCEDURE drop_column_t_ds_worker_group_other_params_json;
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
index 31eaf8f645..9b9812033a 100644
---
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
+++
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -228,3 +228,22 @@ DROP FUNCTION IF EXISTS drop_data_quality_tables();
create index workflow_definition_index_project_code on
t_ds_workflow_definition (project_code);
create index workflow_definition_log_index_project_code on
t_ds_workflow_definition_log (project_code);
+
+-- drop_column_t_ds_worker_group other_params_json
+delimiter d//
+CREATE OR REPLACE FUNCTION drop_column_t_ds_worker_group_other_params_json()
RETURNS void AS $$
+BEGIN
+ IF EXISTS (SELECT 1
+ FROM information_schema.columns
+ WHERE table_name = 't_ds_worker_group'
+ AND column_name = 'other_params_json')
+ THEN
+ALTER TABLE t_ds_worker_group
+DROP COLUMN "other_params_json";
+END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+select drop_column_t_ds_worker_group_other_params_json();
+DROP FUNCTION IF EXISTS drop_column_t_ds_worker_group_other_params_json();
\ No newline at end of file
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java
index afd474193a..84f140ff0b 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java
@@ -65,7 +65,6 @@ public class WorkerGroupMapperTest extends BaseDaoTest {
workerGroup.setCreateTime(new Date());
workerGroup.setUpdateTime(new Date());
workerGroup.setSystemDefault(true);
- workerGroup.setOtherParamsJson("");
workerGroup.setAddrList("localhost");
workerGroupMapper.insert(workerGroup);
return workerGroup;
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java
new file mode 100644
index 0000000000..d14f02c047
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.master;
+
+import org.apache.dolphinscheduler.extract.base.RpcMethod;
+import org.apache.dolphinscheduler.extract.base.RpcService;
+
+@RpcService
+public interface IMasterContainerService {
+
+ @RpcMethod
+ void refreshWorkerGroup();
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
index 41c5fe0f9d..827ebfc576 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
@@ -70,9 +70,9 @@ public class WorkerGroupChangeNotifier {
listeners.add(listener);
}
- void detectWorkerGroupChanges() {
+ public synchronized void detectWorkerGroupChanges() {
try {
- MapComparator<String, WorkerGroup> mapComparator =
detectChangedWorkerGroups();
+ final MapComparator<String, WorkerGroup> mapComparator =
detectChangedWorkerGroups();
triggerListeners(mapComparator);
workerGroupMap = mapComparator.getNewMap();
} catch (Exception ex) {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 76be92e36c..25f42a8985 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -57,7 +57,7 @@ public class MasterConfig implements Validator {
private MasterServerLoadProtection serverLoadProtection = new
MasterServerLoadProtection();
- private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
+ private Duration workerGroupRefreshInterval = Duration.ofMinutes(5);
private CommandFetchStrategy commandFetchStrategy = new
CommandFetchStrategy();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java
new file mode 100644
index 0000000000..f42d93a731
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.rpc;
+
+import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
+import
org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MasterContainerService implements IMasterContainerService {
+
+ @Autowired
+ private WorkerGroupChangeNotifier workerGroupChangeNotifier;
+
+ @Override
+ public void refreshWorkerGroup() {
+ workerGroupChangeNotifier.detectWorkerGroupChanges();
+ }
+}
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 1233d99acc..bb402a1ed8 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -101,7 +101,7 @@ master:
max-system-memory-usage-percentage-thresholds: 0.7
# Master max disk usage , when the master's disk usage is smaller then
this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.7
- worker-group-refresh-interval: 10s
+ worker-group-refresh-interval: 5m
command-fetch-strategy:
type: ID_SLOT_BASED
config: