This is an automated email from the ASF dual-hosted git repository.
fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4be8fd4372 [ISSUE #8265] Implement Batch Creation of Topics in
RocketMQ Admin (#8267)
4be8fd4372 is described below
commit 4be8fd43720c8635fe135404a7fd000c00bb2a15
Author: guyinyou <[email protected]>
AuthorDate: Fri Jun 7 10:28:36 2024 +0800
[ISSUE #8265] Implement Batch Creation of Topics in RocketMQ Admin (#8267)
* add UPDATE_AND_CREATE_TOPIC_LIST
* support creating or updating topic config in batch
---------
Co-authored-by: guyinyou <[email protected]>
Co-authored-by: gaoyang.cgy <[email protected]>
---
.../broker/processor/AdminBrokerProcessor.java | 81 ++++++++++++++
.../rocketmq/broker/topic/TopicConfigManager.java | 11 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 20 ++++
.../rocketmq/client/impl/MQClientAPIImplTest.java | 23 ++++
.../rocketmq/remoting/protocol/RequestCode.java | 1 +
.../protocol/body/CreateTopicListRequestBody.java | 42 ++++++++
.../header/CreateTopicListRequestHeader.java | 31 ++++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 5 +
.../tools/admin/DefaultMQAdminExtImpl.java | 6 ++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 3 +
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../command/topic/UpdateTopicListSubCommand.java | 118 +++++++++++++++++++++
.../topic/UpdateTopicListSubCommandTest.java | 41 +++++++
13 files changed, 383 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 40a7a461e8..44bf2a4813 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -116,6 +116,7 @@ import
org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeQueueData;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -243,6 +244,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
+ case RequestCode.UPDATE_AND_CREATE_TOPIC_LIST:
+ return this.updateAndCreateTopicList(ctx, request);
case RequestCode.DELETE_TOPIC_IN_BROKER:
return this.deleteTopic(ctx, request);
case RequestCode.GET_ALL_TOPIC_CONFIG:
@@ -536,6 +539,84 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
+ private synchronized RemotingCommand
updateAndCreateTopicList(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ long startTime = System.currentTimeMillis();
+
+ final CreateTopicListRequestBody requestBody =
CreateTopicListRequestBody.decode(request.getBody(),
CreateTopicListRequestBody.class);
+ List<TopicConfig> topicConfigList = requestBody.getTopicConfigList();
+
+ StringBuilder builder = new StringBuilder();
+ for (TopicConfig topicConfig : topicConfigList) {
+ builder.append(topicConfig.getTopicName()).append(";");
+ }
+ String topicNames = builder.toString();
+ LOGGER.info("AdminBrokerProcessor#updateAndCreateTopicList:
topicNames: {}, called by {}", topicNames,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+
+ long executionTime;
+
+ try {
+ // Valid topics
+ for (TopicConfig topicConfig : topicConfigList) {
+ String topic = topicConfig.getTopicName();
+ TopicValidator.ValidateTopicResult result =
TopicValidator.validateTopic(topic);
+ if (!result.isValid()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(result.getRemark());
+ return response;
+ }
+ if
(brokerController.getBrokerConfig().isValidateSystemTopicWhenUpdateTopic()) {
+ if (TopicValidator.isSystemTopic(topic)) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The topic[" + topic + "] is
conflict with system topic.");
+ return response;
+ }
+ }
+ if (topicConfig.getTopicMessageType() == TopicMessageType.MIXED
+ &&
!brokerController.getBrokerConfig().isEnableMixedMessageType()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("MIXED message type is not supported.");
+ return response;
+ }
+ if
(topicConfig.equals(this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic)))
{
+ LOGGER.info("Broker receive request to update or create
topic={}, but topicConfig has no changes , so idempotent, caller address={}",
+ topic,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ response.setCode(ResponseCode.SUCCESS);
+ return response;
+ }
+ }
+
+
this.brokerController.getTopicConfigManager().updateTopicConfigList(topicConfigList);
+ if
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ for (TopicConfig topicConfig : topicConfigList) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ }
+ } else {
+
this.brokerController.registerIncrementBrokerData(topicConfigList,
this.brokerController.getTopicConfigManager().getDataVersion());
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ } catch (Exception e) {
+ LOGGER.error("Update / create topic failed for [{}]", request, e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+ finally {
+ executionTime = System.currentTimeMillis() - startTime;
+ InvocationStatus status = response.getCode() ==
ResponseCode.SUCCESS ?
+ InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
+ Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
+ .put(LABEL_INVOCATION_STATUS, status.getName())
+ .put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topicNames))
+ .build();
+ BrokerMetricsManager.topicCreateExecuteTime.record(executionTime,
attributes);
+ }
+ LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames,
executionTime);
+ return response;
+ }
+
private synchronized RemotingCommand
updateAndCreateStaticTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 1ed9cbab5f..d7c06180e9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.topic;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -490,7 +491,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
- public void updateTopicConfig(final TopicConfig topicConfig) {
+ private void updateSingleTopicConfigWithoutPersist(final TopicConfig
topicConfig) {
checkNotNull(topicConfig, "topicConfig shouldn't be null");
Map<String, String> newAttributes = request(topicConfig);
@@ -515,10 +516,18 @@ public class TopicConfigManager extends ConfigManager {
long stateMachineVersion = brokerController.getMessageStore() != null
? brokerController.getMessageStore().getStateMachineVersion() : 0;
dataVersion.nextVersion(stateMachineVersion);
+ }
+ public void updateTopicConfig(final TopicConfig topicConfig) {
+ updateSingleTopicConfigWithoutPersist(topicConfig);
this.persist(topicConfig.getTopicName(), topicConfig);
}
+ public void updateTopicConfigList(final List<TopicConfig> topicConfigList)
{
+ topicConfigList.forEach(this::updateSingleTopicConfigWithoutPersist);
+ this.persist();
+ }
+
private synchronized void updateTieredStoreTopicMetadata(final TopicConfig
topicConfig, Map<String, String> newAttributes) {
if (!(brokerController.getMessageStore() instanceof
TieredMessageStore)) {
if
(newAttributes.get(TopicAttributes.TOPIC_RESERVE_TIME_ATTRIBUTE.getName()) !=
null) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 816ae877ac..f3d7e7c70f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -118,6 +118,7 @@ import
org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
@@ -150,6 +151,7 @@ import
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResult
import
org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.CreateTopicListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.DeleteAccessConfigRequestHeader;
@@ -430,6 +432,24 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQClientException(response.getCode(), response.getRemark());
}
+ public void createTopicList(final String address, final List<TopicConfig>
topicConfigList, final long timeoutMillis)
+ throws InterruptedException, RemotingException, MQClientException {
+ CreateTopicListRequestHeader requestHeader = new
CreateTopicListRequestHeader();
+ CreateTopicListRequestBody requestBody = new
CreateTopicListRequestBody(topicConfigList);
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST,
requestHeader);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(
+ MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
address), request, timeoutMillis);
+ assert response != null;
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ return;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
public void createPlainAccessConfig(final String addr, final
PlainAccessConfig plainAccessConfig,
final long timeoutMillis)
throws RemotingException, InterruptedException, MQClientException {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 97d8d04e64..b0876c7c0d 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -1068,4 +1069,26 @@ public class MQClientAPIImplTest {
int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1",
"default-broker", 1000);
assertThat(topicCnt).isEqualTo(7);
}
+
+ @Test
+ public void testCreateTopicList_Success() throws RemotingException,
InterruptedException, MQClientException {
+ doAnswer((Answer<RemotingCommand>) mock -> {
+ RemotingCommand request = mock.getArgument(1);
+
+ RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }).when(remotingClient).invokeSync(anyString(),
any(RemotingCommand.class), anyLong());
+
+ final List<TopicConfig> topicConfigList = new LinkedList<>();
+ for (int i = 0; i < 16; i++) {
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("Topic" + i);
+ topicConfigList.add(topicConfig);
+ }
+
+ mqClientAPI.createTopicList(brokerAddr, topicConfigList, 10000);
+ }
+
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 1de724e0f1..3be22fc56b 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -28,6 +28,7 @@ public class RequestCode {
public static final int QUERY_CONSUMER_OFFSET = 14;
public static final int UPDATE_CONSUMER_OFFSET = 15;
public static final int UPDATE_AND_CREATE_TOPIC = 17;
+ public static final int UPDATE_AND_CREATE_TOPIC_LIST = 18;
public static final int GET_ALL_TOPIC_CONFIG = 21;
public static final int GET_TOPIC_CONFIG_LIST = 22;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
new file mode 100644
index 0000000000..a72be31ac9
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/CreateTopicListRequestBody.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import java.util.List;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CreateTopicListRequestBody extends RemotingSerializable {
+ @CFNotNull
+ private List<TopicConfig> topicConfigList;
+
+ public CreateTopicListRequestBody() {}
+
+ public CreateTopicListRequestBody(List<TopicConfig> topicConfigList) {
+ this.topicConfigList = topicConfigList;
+ }
+
+ public List<TopicConfig> getTopicConfigList() {
+ return topicConfigList;
+ }
+
+ public void setTopicConfigList(List<TopicConfig> topicConfigList) {
+ this.topicConfigList = topicConfigList;
+ }
+
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
new file mode 100644
index 0000000000..615de750c4
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CreateTopicListRequestHeader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.header;
+
+import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.action.RocketMQAction;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;
+
+@RocketMQAction(value = RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, action =
Action.CREATE)
+public class CreateTopicListRequestHeader extends RpcRequestHeader {
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index a02c878d96..37dd322488 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -195,6 +195,11 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
}
+ @Override
+ public void createAndUpdateTopicConfigList(String addr, List<TopicConfig>
topicConfigList) throws InterruptedException, RemotingException,
MQClientException {
+ defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr,
topicConfigList);
+ }
+
@Override
public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2046b1a44c..b5a20673da 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -275,6 +275,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr,
this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
+ @Override
+ public void createAndUpdateTopicConfigList(final String brokerAddr,
+ final List<TopicConfig> topicConfigList) throws RemotingException,
InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().createTopicList(brokerAddr,
topicConfigList, timeoutMillis);
+ }
+
@Override
public void createAndUpdatePlainAccessConfig(String addr,
PlainAccessConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 50deb7edfc..96940c38b2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -92,6 +92,9 @@ public interface MQAdminExt extends MQAdmin {
final TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
+ void createAndUpdateTopicConfigList(final String addr,
+ final List<TopicConfig> topicConfigList) throws InterruptedException,
RemotingException, MQClientException;
+
void createAndUpdatePlainAccessConfig(final String addr,
final PlainAccessConfig plainAccessConfig) throws RemotingException,
MQBrokerException,
InterruptedException, MQClientException;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f8b8ec248a..e785934ba3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -113,6 +113,7 @@ import
org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand;
import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
+import org.apache.rocketmq.tools.command.topic.UpdateTopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
@@ -187,6 +188,7 @@ public class MQAdminStartup {
public static void initCommand() {
initCommand(new UpdateTopicSubCommand());
+ initCommand(new UpdateTopicListSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new SetConsumeModeSubCommand());
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
new file mode 100644
index 0000000000..a246059e11
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommand.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.topic;
+
+import com.alibaba.fastjson2.JSON;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateTopicListSubCommand implements SubCommand {
+ @Override
+ public String commandName() {
+ return "updateTopicList";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "create or update topic in batch";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ final OptionGroup optionGroup = new OptionGroup();
+ Option opt = new Option("b", "brokerAddr", true, "create topic to
which broker");
+ optionGroup.addOption(opt);
+ opt = new Option("c", "clusterName", true, "create topic to which
cluster");
+ optionGroup.addOption(opt);
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("f", "filename", true, "Path to a file with list of
org.apache.rocketmq.common.TopicConfig in json format");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+ final DefaultMQAdminExt defaultMQAdminExt = new
DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ final String fileName = commandLine.getOptionValue('f').trim();
+
+
+ try {
+ final Path filePath = Paths.get(fileName);
+ if (!Files.exists(filePath)) {
+ System.out.printf("the file path %s does not exists%n",
fileName);
+ return;
+ }
+ final byte[] topicConfigListBytes = Files.readAllBytes(filePath);
+ final List<TopicConfig> topicConfigs =
JSON.parseArray(topicConfigListBytes, TopicConfig.class);
+ if (null == topicConfigs || topicConfigs.isEmpty()) {
+ return;
+ }
+
+ if (commandLine.hasOption('b')) {
+ String brokerAddress = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+
defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs);
+
+ System.out.printf("submit batch of topic config to %s success,
please check the result later.%n",
+ brokerAddress);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ final String clusterName =
commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+
+ Set<String> masterSet =
+
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String brokerAddress : masterSet) {
+
defaultMQAdminExt.createAndUpdateTopicConfigList(brokerAddress, topicConfigs);
+
+ System.out.printf("submit batch of topic config to %s
success, please check the result later.%n",
+ brokerAddress);
+ }
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(),
options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
new file mode 100644
index 0000000000..95bb579da8
--- /dev/null
+++
b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicListSubCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.topic;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class UpdateTopicListSubCommandTest {
+
+ @Test
+ public void testArguments() {
+ UpdateTopicListSubCommand cmd = new UpdateTopicListSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-b 127.0.0.1:10911", "-f
topics.json"};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
+ cmd.buildCommandlineOptions(options), new DefaultParser());
+ assertEquals("127.0.0.1:10911",
commandLine.getOptionValue('b').trim());
+ assertEquals("topics.json", commandLine.getOptionValue('f').trim());
+ }
+}
\ No newline at end of file