This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7cb1b1ee86 [ISSUE #9467] Implement paged retrieval of Topic and
SubscriptionGroup information (#9468)
7cb1b1ee86 is described below
commit 7cb1b1ee86e2facaf0eaf1b19d3c20b5df1522b6
Author: ltamber <[email protected]>
AuthorDate: Wed Jul 2 09:36:54 2025 +0800
[ISSUE #9467] Implement paged retrieval of Topic and SubscriptionGroup
information (#9468)
* Implement paged retrieval of Topic and SubscriptionGroup information
- Implemented pagination logic to support data retrieval by sequence number
and maximum count.
- Added data version checking to ensure the retrieved data is the latest.
- Optimized the result structure to include total count and current page
data.
- Added unit tests
* BrokerOuterAPI
* add timeout & log
---------
Co-authored-by: xiaoming.lt <[email protected]>
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 192 ++++++++++++++++++---
.../broker/processor/AdminBrokerProcessor.java | 117 ++++++++-----
.../subscription/SubscriptionGroupManager.java | 40 +++++
.../rocketmq/broker/topic/TopicConfigManager.java | 25 ++-
.../broker/topic/TopicQueueMappingManager.java | 17 ++
.../broker/processor/AdminBrokerProcessorTest.java | 189 +++++++++++++++++++-
.../broker/slave/SlaveSynchronizeAtomicTest.java | 5 +-
.../broker/slave/SlaveSynchronizeTest.java | 4 +-
.../subscription/SubscriptionGroupManagerTest.java | 80 ++++++++-
.../broker/topic/TopicConfigManagerTest.java | 74 ++++++++
.../org/apache/rocketmq/client/ClientConfig.java | 10 ++
.../rocketmq/client/impl/MQClientAPIImpl.java | 176 ++++++++++++++++---
.../org/apache/rocketmq/common/BrokerConfig.java | 19 ++
.../protocol/body/SubscriptionGroupWrapper.java | 10 ++
...a => GetAllSubscriptionGroupRequestHeader.java} | 41 ++++-
... => GetAllSubscriptionGroupResponseHeader.java} | 21 ++-
...er.java => GetAllTopicConfigRequestHeader.java} | 41 ++++-
.../header/GetAllTopicConfigResponseHeader.java | 12 ++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 14 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 15 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 12 +-
.../tools/admin/DefaultMQAdminExtTest.java | 3 +-
22 files changed, 982 insertions(+), 135 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 83edd88408..21ba349c84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,9 +23,14 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -102,6 +107,10 @@ import
org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetBrokerMemberGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
@@ -139,6 +148,8 @@ import
org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.rpc.RpcClient;
import org.apache.rocketmq.remoting.rpc.RpcClientImpl;
@@ -761,22 +772,86 @@ public class BrokerOuterAPI {
return changedList;
}
- public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
- final String addr) throws RemotingConnectException,
RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+ public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(final
String addr)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ InterruptedException, MQBrokerException, RemotingCommandException {
+
+ DataVersion topicConfigDataVersion = null;
+ DataVersion mappingDataVersion = null;
+ long timeoutMills = getTimeoutMillis();
+ int topicSeq = 0;
+ long beginTime = System.nanoTime();
+ ConcurrentHashMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap = new
ConcurrentHashMap<>();
+ while (true) {
+ long leftTime = timeoutMills -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+ if (leftTime < 0) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
- RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request,
3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return TopicConfigSerializeWrapper.decode(response.getBody(),
TopicConfigAndMappingSerializeWrapper.class);
+ GetAllTopicConfigRequestHeader requestHeader = new
GetAllTopicConfigRequestHeader();
+ requestHeader.setTopicSeq(topicSeq);
+ requestHeader.setMaxTopicNum(getMaxPageSize());
+
requestHeader.setDataVersion(Optional.ofNullable(topicConfigDataVersion).
+ map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ LOGGER.info("getAllTopicConfig from seq {}, max {}, dataVersion
{}",
+ topicSeq, requestHeader.getMaxTopicNum(),
requestHeader.getDataVersion());
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG,
requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(
+ MixAll.brokerVIPChannel(true, addr), request, 30000);
+
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ TopicConfigAndMappingSerializeWrapper
topicConfigSerializeWrapper =
+
TopicConfigAndMappingSerializeWrapper.decode(response.getBody(),
TopicConfigAndMappingSerializeWrapper.class);
+
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+
topicQueueMappingDetailMap.putAll(topicConfigSerializeWrapper.getTopicQueueMappingDetailMap());
+ topicSeq +=
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+
+ DataVersion newDataVersion =
topicConfigSerializeWrapper.getDataVersion();
+ if (topicConfigDataVersion == null) {
+ // fill dataVersion before break the loop to compatible
with old version server
+ topicConfigDataVersion = newDataVersion;
+ mappingDataVersion =
topicConfigSerializeWrapper.getMappingDataVersion();
+ }
+
+ GetAllTopicConfigResponseHeader responseHeader =
+
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
+ Integer totalTopicNum = Optional.ofNullable(responseHeader)
+
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
+
+ if (Objects.isNull(totalTopicNum)) { // compatible with
old version server
+ // the server side don't support totalTopicNum, all data
is returned
+ break;
+ }
+
+ if (!Objects.equals(topicConfigDataVersion, newDataVersion)) {
+ LOGGER.error("dataVersion changed, currentDataVersion: {},
newDataVersion: {}", topicConfigDataVersion, newDataVersion);
+ topicConfigDataVersion = newDataVersion;
+ mappingDataVersion =
topicConfigSerializeWrapper.getMappingDataVersion();
+ topicSeq = 0;
+ topicConfigTable.clear();
+ continue;
+ }
+
+ if (topicSeq >= totalTopicNum - 1) {
+ LOGGER.info("get all topic config, totalTopicNum: {}",
totalTopicNum);
+ break;
+ }
+ } else {
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), addr);
}
- default:
- break;
+
}
- throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
+ TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper =
new TopicConfigAndMappingSerializeWrapper();
+ topicConfigSerializeWrapper.setDataVersion(topicConfigDataVersion);
+ topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+ topicConfigSerializeWrapper.setMappingDataVersion(mappingDataVersion);
+
topicConfigSerializeWrapper.setTopicQueueMappingDetailMap(topicQueueMappingDetailMap);
+ return topicConfigSerializeWrapper;
}
public TimerCheckpoint getTimerCheckPoint(
@@ -849,21 +924,82 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
}
- public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
- final String addr) throws InterruptedException,
RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
null);
- RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return SubscriptionGroupWrapper.decode(response.getBody(),
SubscriptionGroupWrapper.class);
+ public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String
addr)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
+
+ long timeoutMills = getTimeoutMillis();
+ DataVersion currentDataVersion = null;
+ int groupSeq = 0;
+ long beginTime = System.nanoTime();
+ ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
= new ConcurrentHashMap<>();
+ ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
new ConcurrentHashMap<>();
+
+ while (true) {
+ long leftTime = timeoutMills -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+ if (leftTime < 0) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
+
+ GetAllSubscriptionGroupRequestHeader requestHeader = new
GetAllSubscriptionGroupRequestHeader();
+ requestHeader.setGroupSeq(groupSeq);
+ requestHeader.setMaxGroupNum(getMaxPageSize());
+
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
+ .map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ LOGGER.info("getAllSubscriptionGroup from seq {}, max {},
dataVersion {}",
+ groupSeq, requestHeader.getMaxGroupNum(),
requestHeader.getDataVersion());
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
requestHeader);
+ RemotingCommand response = this.remotingClient.invokeSync(addr,
request, 30000);
+
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper =
+ SubscriptionGroupWrapper.decode(response.getBody(),
SubscriptionGroupWrapper.class);
+
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+
+ DataVersion newDataVersion =
subscriptionGroupWrapper.getDataVersion();
+ if (currentDataVersion == null) {
+ // fill dataVersion before break the loop to compatible
with old version server
+ currentDataVersion = newDataVersion;
+ }
+
+ groupSeq +=
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+
+ GetAllSubscriptionGroupResponseHeader responseHeader =
+
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
+ Integer totalGroupNum = Optional.ofNullable(responseHeader)
+
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
+
+ if (Objects.isNull(totalGroupNum)) {
+ // the server side don't support totalGroupNum, all data
is returned
+ break;
+ }
+
+ if (!Objects.equals(currentDataVersion, newDataVersion)) {
+ LOGGER.error("dataVersion changed, currentDataVersion: {},
newDataVersion: {}",
+ currentDataVersion, newDataVersion);
+ currentDataVersion = newDataVersion;
+ groupSeq = 0;
+ subscriptionGroupTable.clear();
+ forbiddenTable.clear();
+ continue;
+ }
+
+ if (groupSeq >= totalGroupNum - 1) {
+ LOGGER.info("get all subscription group config,
totalGroupNum: {}", totalGroupNum);
+ break;
+ }
+ } else {
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), addr);
}
- default:
- break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
+ SubscriptionGroupWrapper allSubscriptionGroup = new
SubscriptionGroupWrapper();
+ allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
+ allSubscriptionGroup.setForbiddenTable(forbiddenTable);
+ allSubscriptionGroup.setDataVersion(currentDataVersion);
+ return allSubscriptionGroup;
}
public void registerRPCHook(RPCHook rpcHook) {
@@ -1491,4 +1627,12 @@ public class BrokerOuterAPI {
return pullResult;
}
+ private int getMaxPageSize() {
+ return 2000;
+ }
+
+ private long getTimeoutMillis() {
+ return TimeUnit.SECONDS.toMillis(60);
+ }
+
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fd8d925e3b..4203b3af82 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.auth.authentication.enums.UserType;
import
org.apache.rocketmq.auth.authentication.exception.AuthenticationException;
@@ -68,6 +70,8 @@ import
org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.metrics.InvocationStatus;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
@@ -137,6 +141,7 @@ import
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicList;
@@ -157,6 +162,9 @@ import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader
import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetBrokerConfigResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -816,40 +824,54 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
- private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request)
+ throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
- // final GetAllTopicConfigResponseHeader responseHeader =
- // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
+ final GetAllTopicConfigResponseHeader responseHeader =
+ (GetAllTopicConfigResponseHeader) response.readCustomHeader();
+ final GetAllTopicConfigRequestHeader requestHeader =
+
request.decodeCommandCustomHeader(GetAllTopicConfigRequestHeader.class);
+
+ String dataVersionStr = requestHeader.getDataVersion();
+ Integer topicSeq = requestHeader.getTopicSeq();
+ Integer maxTopicNum = requestHeader.getMaxTopicNum();
+
+ TopicConfigManager tcManager =
brokerController.getTopicConfigManager();
+ TopicQueueMappingManager tqmManager =
brokerController.getTopicQueueMappingManager();
TopicConfigAndMappingSerializeWrapper
topicConfigAndMappingSerializeWrapper = new
TopicConfigAndMappingSerializeWrapper();
+ if (!brokerController.getBrokerConfig().isEnableSplitMetadata()
+ || ObjectUtils.allNull(dataVersionStr, topicSeq, maxTopicNum)) {
// old client, return all topic config
-
topicConfigAndMappingSerializeWrapper.setDataVersion(this.brokerController.getTopicConfigManager().getDataVersion());
-
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(this.brokerController.getTopicConfigManager().getTopicConfigTable());
+
topicConfigAndMappingSerializeWrapper.setDataVersion(tcManager.getDataVersion());
+
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(tcManager.getTopicConfigTable());
-
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(this.brokerController.getTopicQueueMappingManager().getDataVersion());
-
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable());
+
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(tqmManager.getDataVersion());
+
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(tqmManager.getTopicQueueMappingTable());
+ } else {
+ int topicNum =
Math.min(brokerController.getBrokerConfig().getSplitMetadataSize(),
+ Optional.ofNullable(maxTopicNum).orElse(Integer.MAX_VALUE));
// use smaller value
+ ConcurrentHashMap<String, TopicConfig> subTopicConfigTable =
+ tcManager.subTopicConfigTable(dataVersionStr, topicSeq,
topicNum);
+
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(subTopicConfigTable);
+
topicConfigAndMappingSerializeWrapper.setDataVersion(tcManager.getDataVersion());
- String content = topicConfigAndMappingSerializeWrapper.toJson();
- if (content != null && content.length() > 0) {
- try {
- response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
- } catch (UnsupportedEncodingException e) {
- LOGGER.error("", e);
+
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(tqmManager.getDataVersion());
+
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(
+
tqmManager.subTopicQueueMappingTable(subTopicConfigTable.keySet()));
+ }
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UnsupportedEncodingException " +
e.getMessage());
- return response;
- }
+
responseHeader.setTotalTopicNum(tcManager.getTopicConfigTable().size());
+ String content = topicConfigAndMappingSerializeWrapper.toJson();
+ if (StringUtils.isNotBlank(content)) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ response.setBody(content.getBytes(StandardCharsets.UTF_8));
} else {
LOGGER.error("No topic in this broker, client: {}",
ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No topic in this broker");
- return response;
}
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
-
return response;
}
@@ -1584,28 +1606,45 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- String content =
this.brokerController.getSubscriptionGroupManager().encode();
- if (content != null && content.length() > 0) {
- try {
- response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
- } catch (UnsupportedEncodingException e) {
- LOGGER.error("UnsupportedEncodingException
getAllSubscriptionGroup", e);
-
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("UnsupportedEncodingException " +
e.getMessage());
- return response;
- }
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetAllSubscriptionGroupResponseHeader.class);
+ final GetAllSubscriptionGroupResponseHeader responseHeader =
+ (GetAllSubscriptionGroupResponseHeader)
response.readCustomHeader();
+ final GetAllSubscriptionGroupRequestHeader requestHeader =
+
request.decodeCommandCustomHeader(GetAllSubscriptionGroupRequestHeader.class);
+
+ String dataVersionStr = requestHeader.getDataVersion();
+ Integer groupSeq = requestHeader.getGroupSeq();
+ Integer maxGroupNum = requestHeader.getMaxGroupNum();
+
+ SubscriptionGroupManager sgManager =
this.brokerController.getSubscriptionGroupManager();
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = new
SubscriptionGroupWrapper();
+ if (!brokerController.getBrokerConfig().isEnableSplitMetadata()
+ || ObjectUtils.allNull(dataVersionStr, groupSeq, maxGroupNum)) {
+
subscriptionGroupWrapper.setSubscriptionGroupTable(sgManager.getSubscriptionGroupTable());
+
subscriptionGroupWrapper.setForbiddenTable(sgManager.getForbiddenTable());
+
subscriptionGroupWrapper.setDataVersion(sgManager.getDataVersion());
+ } else {
+ int groupNum =
Math.min(brokerController.getBrokerConfig().getSplitMetadataSize(),
+ Optional.ofNullable(maxGroupNum).orElse(Integer.MAX_VALUE));
+ ConcurrentMap<String, SubscriptionGroupConfig> subGroupTable =
+ sgManager.subGroupTable(dataVersionStr, groupSeq, groupNum);
+ subscriptionGroupWrapper.setSubscriptionGroupTable(subGroupTable);
+
subscriptionGroupWrapper.setDataVersion(sgManager.getDataVersion());
+
subscriptionGroupWrapper.setForbiddenTable(sgManager.subForbiddenTable(subGroupTable.keySet()));
+ }
+
+
responseHeader.setTotalGroupNum(sgManager.getSubscriptionGroupTable().size());
+ String content = subscriptionGroupWrapper.toJson();
+ if (StringUtils.isNotBlank(content)) {
+ response.setBody(content.getBytes(StandardCharsets.UTF_8));
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
} else {
LOGGER.error("No subscription group in this broker, client:{} ",
ctx.channel().remoteAddress());
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("No subscription group in this broker");
- return response;
}
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
-
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f3e669fb3e..c7083365be 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -22,8 +22,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -322,10 +330,42 @@ public class SubscriptionGroupManager extends
ConfigManager {
return subscriptionGroupTable;
}
+ public ConcurrentHashMap<String, SubscriptionGroupConfig>
subGroupTable(String dataVersion, int groupSeq,
+ int maxGroupNum) {
+ // [groupSeq, groupSeq + maxGroupNum)
+ int beginIndex = groupSeq;
+ if (StringUtils.isBlank(dataVersion) ||
!Objects.equals(DataVersion.fromJson(dataVersion, DataVersion.class),
this.dataVersion)) {
+ beginIndex = 0;
+ log.info("get sub subscription group table from {} due to {}",
beginIndex,
+ StringUtils.isBlank(dataVersion) ? "DataVersion Empty" :
"DataVersion Changed");
+ }
+
+ ConcurrentHashMap<String, SubscriptionGroupConfig> subGroupTable = new
ConcurrentHashMap<>();
+ if (beginIndex < subscriptionGroupTable.size()) {
+ int endIndex = Math.min(beginIndex + maxGroupNum,
subscriptionGroupTable.size());
+
+ ImmutableSortedMap<String, SubscriptionGroupConfig> sortedMap =
ImmutableSortedMap.copyOf(subscriptionGroupTable);
+
subGroupTable.putAll(sortedMap.subMap(sortedMap.keySet().asList().get(beginIndex),true,
+ sortedMap.keySet().asList().get(endIndex - 1),true));
+ }
+
+ return subGroupTable;
+ }
+
public ConcurrentMap<String, ConcurrentMap<String, Integer>>
getForbiddenTable() {
return forbiddenTable;
}
+ public ConcurrentMap<String, ConcurrentMap<String, Integer>>
subForbiddenTable(Set<String> groupSet) {
+ if (MapUtils.isEmpty(forbiddenTable) ||
CollectionUtils.isEmpty(groupSet)) {
+ return Maps.newConcurrentMap();
+ }
+
+ return forbiddenTable.entrySet().stream()
+ .filter(e -> groupSet.contains(e.getKey()))
+ .collect(Collectors.toConcurrentMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
public void setForbiddenTable(
ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable) {
this.forbiddenTable = forbiddenTable;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index b20cafc101..8211a8fb09 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,7 +30,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.collect.ImmutableMap;
-
+import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
@@ -703,6 +704,28 @@ public class TopicConfigManager extends ConfigManager {
return topicConfigTable;
}
+ public ConcurrentHashMap<String, TopicConfig> subTopicConfigTable(String
dataVersion, int topicSeq,
+ int maxTopicNum) {
+ // [topicSeq, topicSeq + maxTopicNum)
+ int beginIndex = topicSeq;
+ if (StringUtils.isBlank(dataVersion) ||
!Objects.equals(DataVersion.fromJson(dataVersion, DataVersion.class),
this.dataVersion)) {
+ beginIndex = 0;
+ log.info("get sub topic config table from {} due to {}",
beginIndex,
+ StringUtils.isBlank(dataVersion) ? "DataVersion Empty" :
"DataVersion Changed");
+ }
+
+ ConcurrentHashMap<String, TopicConfig> subTopicConfigTable = new
ConcurrentHashMap<>();
+ if (beginIndex < topicConfigTable.size()) {
+ int endIndex = Math.min(beginIndex + maxTopicNum,
topicConfigTable.size());
+
+ ImmutableSortedMap<String, TopicConfig> sortedMap =
ImmutableSortedMap.copyOf(topicConfigTable);
+
subTopicConfigTable.putAll(sortedMap.subMap(sortedMap.keySet().asList().get(beginIndex),true,
+ sortedMap.keySet().asList().get(endIndex - 1),true));
+ }
+
+ return subTopicConfigTable;
+ }
+
private Map<String, String> request(TopicConfig topicConfig) {
return topicConfig.getAttributes() == null ? new HashMap<>() :
topicConfig.getAttributes();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 6b9cf15938..4b0714decb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,12 +17,19 @@
package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Maps;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -178,6 +185,16 @@ public class TopicQueueMappingManager extends
ConfigManager {
return topicQueueMappingTable;
}
+ public ConcurrentMap<String, TopicQueueMappingDetail>
subTopicQueueMappingTable(Set<String> topicSet) {
+ if (MapUtils.isEmpty(topicQueueMappingTable) ||
CollectionUtils.isEmpty(topicSet)) {
+ return Maps.newConcurrentMap();
+ }
+
+ return topicQueueMappingTable.entrySet().stream()
+ .filter(e -> topicSet.contains(e.getKey()))
+ .collect(Collectors.toConcurrentMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
public DataVersion getDataVersion() {
return dataVersion;
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index a6bcca954d..f3d0eb0782 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -17,9 +17,12 @@
package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.auth.authentication.enums.UserType;
import
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
import org.apache.rocketmq.auth.authentication.model.Subject;
@@ -61,6 +64,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -71,6 +75,8 @@ import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
@@ -81,6 +87,9 @@ import
org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
@@ -137,11 +146,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -455,12 +468,166 @@ public class AdminBrokerProcessorTest {
@Test
public void testGetAllTopicConfig() throws Exception {
- GetAllTopicConfigResponseHeader getAllTopicConfigResponseHeader = new
GetAllTopicConfigResponseHeader();
+ GetAllTopicConfigRequestHeader getAllTopicConfigResponseHeader = new
GetAllTopicConfigRequestHeader();
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG,
getAllTopicConfigResponseHeader);
RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+
+ private void getAllTopicConfig(boolean enableSplitMetadata) throws
RemotingCommandException {
+
brokerController.getBrokerConfig().setEnableSplitMetadata(enableSplitMetadata);
+
+ // old client, request null
+ RemotingCommand requestOldClient =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+ RemotingCommand responseOldClient =
adminBrokerProcessor.processRequest(handlerContext, requestOldClient);
+
assertThat(responseOldClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapperOldClient =
+ TopicConfigSerializeWrapper.decode(responseOldClient.getBody(),
TopicConfigSerializeWrapper.class);
+
assertThat(Maps.difference(topicConfigSerializeWrapperOldClient.getTopicConfigTable(),
+
brokerController.getTopicConfigManager().getTopicConfigTable()).areEqual()).isTrue();
+
+ // new client, request seq from 0
+ AtomicBoolean dataVersionChanged = new AtomicBoolean(false);
+ int topicSeq = 0;
+ DataVersion dataVersion = null;
+ int pageSize = ThreadLocalRandom.current().nextInt(500,
brokerController.getBrokerConfig().getSplitMetadataSize());
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ while (true) {
+ GetAllTopicConfigRequestHeader requestHeader = new
GetAllTopicConfigRequestHeader();
+ requestHeader.setTopicSeq(topicSeq);
+ requestHeader.setMaxTopicNum(pageSize);
+
requestHeader.setDataVersion(Optional.ofNullable(dataVersion).map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ RemotingCommand requestNewClient =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG,
requestHeader);
+ requestNewClient.makeCustomHeaderToNet();
+
+ RemotingCommand responseNewClient =
adminBrokerProcessor.processRequest(handlerContext, requestNewClient);
+ GetAllTopicConfigResponseHeader responseHeader =
(GetAllTopicConfigResponseHeader) responseNewClient.readCustomHeader();
+
assertThat(responseNewClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+
TopicConfigSerializeWrapper.decode(responseNewClient.getBody(),
TopicConfigSerializeWrapper.class);
+ topicSeq +=
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+ assertThat(responseHeader.getTotalTopicNum())
+
.isEqualTo(brokerController.getTopicConfigManager().getTopicConfigTable().size());
+ assertThat(topicConfigSerializeWrapper.getDataVersion())
+
.isEqualTo(brokerController.getTopicConfigManager().getDataVersion());
+
+ DataVersion newDataVersion =
topicConfigSerializeWrapper.getDataVersion();
+ if (dataVersion == null) {
+ dataVersion = newDataVersion;
+ }
+
+ // mock server side data version changed
+ if (topicSeq > responseHeader.getTotalTopicNum() / 2 &&
dataVersionChanged.compareAndSet(false, true)) {
+
brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+ }
+
+ if (!Objects.equals(dataVersion, newDataVersion)) {
+ dataVersion = newDataVersion;
+ topicSeq = 0; // data version diff, from 0
+ topicConfigTable.clear();
+ continue;
+ }
+
+
+
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+ if (topicSeq >= responseHeader.getTotalTopicNum() - 1) {
+ break;
+ } else {
+
assertThat(topicConfigSerializeWrapper.getTopicConfigTable().size()).isEqualTo(pageSize);
+ }
+ }
+ assertThat(Maps.difference(topicConfigTable,
brokerController.getTopicConfigManager().getTopicConfigTable()).areEqual()).isTrue();
+ }
+
+ @Test
+ public void testGetAllTopicConfigWithRequestHeader() throws
RemotingCommandException {
+ // from [0, 50000)
+ fillTopicConfigTable(50000);
+
+ getAllTopicConfig(true);
+ getAllTopicConfig(false); // broker side disable split , will return
all topic config
+ }
+
+
+ private void getAllSubscriptionGroup(boolean enableSplitMetadata) throws
RemotingCommandException {
+
brokerController.getBrokerConfig().setEnableSplitMetadata(enableSplitMetadata);
+
+ // old client, request null
+ RemotingCommand requestOldClient =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
null);
+ RemotingCommand responseOldClient =
adminBrokerProcessor.processRequest(handlerContext, requestOldClient);
+
assertThat(responseOldClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ // new client, request from 0
+ AtomicBoolean dataVersionChanged = new AtomicBoolean(false);
+ int groupSeq = 0;
+ int pageSize = ThreadLocalRandom.current().nextInt(500,
brokerController.getBrokerConfig().getSplitMetadataSize());
+ DataVersion dataVersion = null;
+ ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
= new ConcurrentHashMap<>();
+ ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
new ConcurrentHashMap<>();
+ while (true) {
+ GetAllSubscriptionGroupRequestHeader requestHeader = new
GetAllSubscriptionGroupRequestHeader();
+ requestHeader.setGroupSeq(groupSeq);
+ requestHeader.setMaxGroupNum(pageSize);
+
requestHeader.setDataVersion(Optional.ofNullable(dataVersion).map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ RemotingCommand requestNewClient =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
requestHeader);
+ requestNewClient.makeCustomHeaderToNet();
+ RemotingCommand responseNewClient =
adminBrokerProcessor.processRequest(handlerContext, requestNewClient);
+ GetAllSubscriptionGroupResponseHeader responseHeader =
(GetAllSubscriptionGroupResponseHeader) responseNewClient.readCustomHeader();
+
assertThat(responseNewClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper =
+ SubscriptionGroupWrapper.decode(responseNewClient.getBody(),
SubscriptionGroupWrapper.class);
+
+ groupSeq +=
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+ DataVersion newDataVersion =
subscriptionGroupWrapper.getDataVersion();
+
+ assertThat(responseHeader.getTotalGroupNum()).isEqualTo(
+
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().size());
+
assertThat(newDataVersion).isEqualTo(brokerController.getSubscriptionGroupManager().getDataVersion());
+
+ if (dataVersion == null) {
+ dataVersion = newDataVersion;
+ }
+
+
+ // mock server side data version changed
+ if (groupSeq > responseHeader.getTotalGroupNum() / 2 &&
dataVersionChanged.compareAndSet(false, true)) {
+
brokerController.getSubscriptionGroupManager().getDataVersion().nextVersion();
+ }
+
+ if (!Objects.equals(dataVersion, newDataVersion)) {
+ dataVersion = newDataVersion;
+ groupSeq = 0; // data version diff, from 0
+ subscriptionGroupTable.clear();
+ forbiddenTable.clear();
+ continue;
+ }
+
+
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+ if (groupSeq >= responseHeader.getTotalGroupNum() - 1) {
+ break;
+ } else {
+
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().size()).isEqualTo(pageSize);
+ }
+ }
+ assertThat(Maps.difference(subscriptionGroupTable,
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable()).areEqual()).isTrue();
+ assertThat(Maps.difference(forbiddenTable,
brokerController.getSubscriptionGroupManager().getForbiddenTable()).areEqual()).isTrue();
+ }
+
+ @Test
+ public void testGetAllSubscriptionGroupWithRequestHeader() throws
RemotingCommandException {
+ fillSubscriptionGroupManager(50000);
+
+ getAllSubscriptionGroup(true);
+ getAllSubscriptionGroup(false);
+
+ }
+
@Test
public void testUpdateBrokerConfig() throws Exception {
handlerContext = mock(ChannelHandlerContext.class);
@@ -1434,4 +1601,24 @@ public class AdminBrokerProcessorTest {
private boolean notToBeExecuted() {
return MixAll.isMac();
}
+
+ private void fillTopicConfigTable(int num) {
+ for (int i = num - 1; i >= 0; i--) {
+ String topicName = String.format("topic%05d", i);
+ TopicConfig topicConfig = new TopicConfig(topicName, 1, 1,
+ PermName.PERM_READ | PermName.PERM_WRITE, 0);
+
brokerController.getTopicConfigManager().getTopicConfigTable().put(topicName,
topicConfig);
+ }
+ }
+
+ private void fillSubscriptionGroupManager(int num) {
+ for (int i = num - 1; i >= 0; i--) {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ String groupName = String.format("group-%05d", i);
+ subscriptionGroupConfig.setGroupName(groupName);
+ Map<String, String> attr = ImmutableMap.of("+test", "true");
+ subscriptionGroupConfig.setAttributes(attr);
+
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put(groupName,
subscriptionGroupConfig);
+ }
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
index 75db22e7e7..714dcd967f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
@@ -29,6 +29,7 @@ import
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -113,8 +114,8 @@ public class SlaveSynchronizeAtomicTest {
@Test
public void testSyncAtomically()
- throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException,
- InterruptedException {
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException,
+ InterruptedException, RemotingCommandException {
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
index 95db733d0d..c9461c4224 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -132,7 +133,8 @@ public class SlaveSynchronizeTest {
}
@Test
- public void testSyncAll() throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException, UnsupportedEncodingException {
+ public void testSyncAll() throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException, UnsupportedEncodingException,
RemotingCommandException {
TopicConfig newTopicConfig = new TopicConfig("NewTopic");
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3384d479c6..3c975a599b 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -24,13 +24,18 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -45,7 +50,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
@RunWith(MockitoJUnitRunner.class)
public class SubscriptionGroupManagerTest {
private String group = "group";
@@ -58,9 +62,6 @@ public class SubscriptionGroupManagerTest {
@Before
public void before() {
- if (notToBeExecuted()) {
- return;
- }
SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
"test",
false,
@@ -166,4 +167,75 @@ public class SubscriptionGroupManagerTest {
}
+ @Test
+ public void testSubGroupTable() {
+ // Empty SubscriptionGroupManager
+ subscriptionGroupManager.getSubscriptionGroupTable().clear();
+ Map<String, SubscriptionGroupConfig> result =
+
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
0, 200);
+ assertThat(result).isEmpty();
+
+ // fill SubscriptionGroupManager
+ int totalGroupNum = 50000;
+ fillSubscriptionGroupManager(totalGroupNum);
+
+ // Null DataVersion
+ int beginIndex = 0, maxNum = 200;
+ int endIndex = beginIndex + maxNum - 1;
+ result = subscriptionGroupManager.subGroupTable(null, beginIndex,
maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("group-%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
endIndex + 1)));
+
+ // Different DataVersion
+ DataVersion differentVersion = new DataVersion();
+ differentVersion.setCounter(new AtomicLong(1000L)); // different
counter
+ differentVersion.setTimestamp(System.currentTimeMillis());
+ result =
subscriptionGroupManager.subGroupTable(differentVersion.toJson(), 300, maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("group-%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
endIndex + 1)));
+
+ // BeginIndexOutOfRange
+ result =
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
totalGroupNum, 200);
+
+ Assert.assertTrue(result.isEmpty());
+
+ // Normal Case
+ beginIndex = 300;
+ endIndex = beginIndex + maxNum - 1;
+ result =
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
beginIndex, maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("group-%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
endIndex + 1)));
+
+ // NotFullTopicConfigTable
+ beginIndex = 49950;
+ endIndex =
Math.min(subscriptionGroupManager.getSubscriptionGroupTable().size() - 1,
beginIndex + maxNum - 1);
+ result =
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
beginIndex, maxNum);
+
+ Assert.assertEquals(totalGroupNum - beginIndex, result.size());
+ Assert.assertTrue(result.containsKey(String.format("group-%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("group-%05d",
endIndex + 1)));
+
+ }
+
+ private void fillSubscriptionGroupManager(int num) {
+ for (int i = num - 1; i >= 0; i--) {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ String groupName = String.format("group-%05d", i);
+ subscriptionGroupConfig.setGroupName(groupName);
+ Map<String, String> attr = ImmutableMap.of("+test", "true");
+ subscriptionGroupConfig.setAttributes(attr);
+
subscriptionGroupManager.getSubscriptionGroupTable().put(groupName,
subscriptionGroupConfig);
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 3fd1d14c3a..5b2ea0b4d5 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -22,6 +22,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
@@ -32,7 +35,9 @@ import org.apache.rocketmq.common.attribute.BooleanAttribute;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.EnumAttribute;
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
@@ -326,4 +331,73 @@ public class TopicConfigManagerTest {
TopicAttributes.ALL.putAll(supportedAttributes);
}
+
+ private void fillTopicConfigTable(int num) {
+ ConcurrentHashMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ for (int i = num - 1; i >= 0; i--) {
+ String topicName = String.format("topic%05d", i);
+ TopicConfig topicConfig = new TopicConfig(topicName, 1, 1,
+ PermName.PERM_READ | PermName.PERM_WRITE, 0);
+ topicConfigTable.put(topicName, topicConfig);
+ }
+ topicConfigManager.setTopicConfigTable(topicConfigTable);
+ }
+
+ @Test
+ public void testSubTopicConfigTable() {
+ // Empty TopicConfigTable
+ topicConfigManager.getTopicConfigTable().clear();
+ Map<String, TopicConfig> result =
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
0, 200);
+ Assert.assertTrue(result.isEmpty());
+
+ // init table, topic range [0, N)
+ final int totalTopicNum = 50000;
+ fillTopicConfigTable(totalTopicNum);
+
+ // Null DataVersion
+ int beginIndex = 0, maxNum = 200;
+ int endIndex = beginIndex + maxNum - 1;
+ result = topicConfigManager.subTopicConfigTable(null, beginIndex,
maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("topic%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
endIndex + 1)));
+
+ // Different DataVersion
+ DataVersion differentVersion = new DataVersion();
+ differentVersion.setCounter(new AtomicLong(1000L)); // different
counter
+ differentVersion.setTimestamp(System.currentTimeMillis());
+ result =
topicConfigManager.subTopicConfigTable(differentVersion.toJson(), 300, maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("topic%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
endIndex + 1)));
+
+ // BeginIndexOutOfRange
+ result =
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
totalTopicNum, 200);
+
+ Assert.assertTrue(result.isEmpty());
+
+ // Normal Case
+ beginIndex = 300;
+ endIndex = beginIndex + maxNum - 1;
+ result =
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
beginIndex, maxNum);
+
+ Assert.assertEquals(maxNum, result.size());
+ Assert.assertTrue(result.containsKey(String.format("topic%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
endIndex + 1)));
+
+ // NotFullTopicConfigTable
+ beginIndex = 49950;
+ endIndex = Math.min(topicConfigManager.getTopicConfigTable().size() -
1, beginIndex + maxNum - 1);
+ result =
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
beginIndex, maxNum);
+
+ Assert.assertEquals(totalTopicNum - beginIndex, result.size());
+ Assert.assertTrue(result.containsKey(String.format("topic%05d",
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
beginIndex - 1)));
+ Assert.assertFalse(result.containsKey(String.format("topic%05d",
endIndex + 1)));
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 696b073b37..79cb04af1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -111,6 +111,8 @@ public class ClientConfig {
*/
protected String traceTopic;
+ protected int maxPageSizeInGetMetadata = 2000;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -515,6 +517,14 @@ public class ClientConfig {
this.traceTopic = traceTopic;
}
+ public int getMaxPageSizeInGetMetadata() {
+ return maxPageSizeInGetMetadata;
+ }
+
+ public void setMaxPageSizeInGetMetadata(int maxPageSizeInGetMetadata) {
+ this.maxPageSizeInGetMetadata = maxPageSizeInGetMetadata;
+ }
+
@Override
public String toString() {
return "ClientConfig{" +
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c001b33fa9..95bb0e8a96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -85,6 +85,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -151,6 +152,10 @@ import
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonReq
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
@@ -242,8 +247,13 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
@@ -2806,21 +2816,81 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
throw new MQClientException(response.getCode(), response.getRemark());
}
- public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
null);
- RemotingCommand response = this.remotingClient
-
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
brokerAddr), request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return SubscriptionGroupWrapper.decode(response.getBody(),
SubscriptionGroupWrapper.class);
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
+
+ DataVersion currentDataVersion = null;
+ int groupSeq = 0;
+ ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
= new ConcurrentHashMap<>();
+ ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable =
new ConcurrentHashMap<>();
+ long beginTime = System.nanoTime();
+ while (true) {
+ long leftTime = timeoutMillis -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+ if (leftTime < 0) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
+
+ GetAllSubscriptionGroupRequestHeader requestHeader = new
GetAllSubscriptionGroupRequestHeader();
+ requestHeader.setGroupSeq(groupSeq);
+
requestHeader.setMaxGroupNum(clientConfig.getMaxPageSizeInGetMetadata());
+
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
+ .map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ log.info("getAllSubscriptionGroup from seq {}, max {}, dataVersion
{}",
+ groupSeq, requestHeader.getMaxGroupNum(),
requestHeader.getDataVersion());
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
requestHeader);
+ RemotingCommand response = this.remotingClient.invokeSync(
+
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr),
request, leftTime);
+
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper =
+ SubscriptionGroupWrapper.decode(response.getBody(),
SubscriptionGroupWrapper.class);
+
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+
+ DataVersion newDataVersion =
subscriptionGroupWrapper.getDataVersion();
+ if (currentDataVersion == null) {
+ // fill dataVersion before break the loop to compatible
with old version server
+ currentDataVersion = newDataVersion;
+ }
+
+ groupSeq +=
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+
+ GetAllSubscriptionGroupResponseHeader responseHeader =
+
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
+ Integer totalGroupNum = Optional.ofNullable(responseHeader)
+
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
+
+ if (Objects.isNull(totalGroupNum)) {
+ // the server side don't support totalGroupNum, all data
is returned
+ break;
+ }
+
+ if (!Objects.equals(currentDataVersion, newDataVersion)) {
+ log.error("dataVersion changed, currentDataVersion: {},
newDataVersion: {}",
+ currentDataVersion, newDataVersion);
+ currentDataVersion = newDataVersion;
+ groupSeq = 0;
+ subscriptionGroupTable.clear();
+ forbiddenTable.clear();
+ continue;
+ }
+
+ if (groupSeq >= totalGroupNum - 1) {
+ log.info("get all subscription group config,
totalGroupNum: {}", totalGroupNum);
+ break;
+ }
+ } else {
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), brokerAddr);
}
- default:
- break;
}
- throw new MQBrokerException(response.getCode(), response.getRemark(),
brokerAddr);
+
+ SubscriptionGroupWrapper allSubscriptionGroup = new
SubscriptionGroupWrapper();
+ allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
+ allSubscriptionGroup.setForbiddenTable(forbiddenTable);
+ allSubscriptionGroup.setDataVersion(currentDataVersion);
+ return allSubscriptionGroup;
}
public SubscriptionGroupConfig getSubscriptionGroupConfig(final String
brokerAddr, String group,
@@ -2842,23 +2912,77 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback, StartAndShutdo
throw new MQBrokerException(response.getCode(), response.getRemark(),
brokerAddr);
}
- public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
- long timeoutMillis) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+ public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ InterruptedException, MQBrokerException, RemotingCommandException {
+
+ DataVersion currentDataVersion = null;
+ int topicSeq = 0;
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
+ long beginTime = System.nanoTime();
+ while (true) {
+ long leftTime = timeoutMillis -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+ if (leftTime <= 0) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
+
+ GetAllTopicConfigRequestHeader requestHeader = new
GetAllTopicConfigRequestHeader();
+ requestHeader.setTopicSeq(topicSeq);
+
requestHeader.setMaxTopicNum(clientConfig.getMaxPageSizeInGetMetadata());
+
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion).
+ map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+ log.info("getAllTopicConfig from seq {}, max {}, dataVersion {}",
+ topicSeq, requestHeader.getMaxTopicNum(),
requestHeader.getDataVersion());
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG,
requestHeader);
- RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return TopicConfigSerializeWrapper.decode(response.getBody(),
TopicConfigSerializeWrapper.class);
+ RemotingCommand response = this.remotingClient.invokeSync(
+
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, leftTime);
+
+ assert response != null;
+ if (response.getCode() == SUCCESS) {
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+ TopicConfigSerializeWrapper.decode(response.getBody(),
TopicConfigSerializeWrapper.class);
+
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+ topicSeq +=
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+ DataVersion newDataVersion =
topicConfigSerializeWrapper.getDataVersion();
+ if (currentDataVersion == null) {
+ // fill dataVersion before break the loop to compatible
with old version server
+ currentDataVersion = newDataVersion;
+ }
+
+ GetAllTopicConfigResponseHeader responseHeader =
+
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
+ Integer totalTopicNum = Optional.ofNullable(responseHeader)
+
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
+
+ if (Objects.isNull(totalTopicNum)) { // compatible with
old version server
+ // the server side don't support totalTopicNum, all data
is returned
+ break;
+ }
+
+ if (!Objects.equals(currentDataVersion, newDataVersion)) {
+ log.error("dataVersion changed, currentDataVersion: {},
newDataVersion: {}", currentDataVersion, newDataVersion);
+ currentDataVersion = newDataVersion;
+ topicSeq = 0;
+ topicConfigTable.clear();
+ continue;
+ }
+
+ if (topicSeq >= totalTopicNum - 1) {
+ log.info("get all topic config, totalTopicNum: {}",
totalTopicNum);
+ break;
+ }
+ } else {
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), addr);
}
- default:
- break;
+
}
- throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setDataVersion(currentDataVersion);
+ topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+ return topicConfigSerializeWrapper;
}
public void updateNameServerConfig(final Properties properties, final
List<String> nameServers, long timeoutMillis)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 3d0feec8a7..77f49554a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -425,6 +425,9 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean enableSplitRegistration = false;
+ private boolean enableSplitMetadata = true;
+ private int splitMetadataSize = 2000;
+
private long popInflightMessageThreshold = 10000;
private boolean enablePopMessageThreshold = false;
@@ -2060,4 +2063,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setEnableCreateSysGroup(boolean enableCreateSysGroup) {
this.enableCreateSysGroup = enableCreateSysGroup;
}
+
+ public boolean isEnableSplitMetadata() {
+ return enableSplitMetadata;
+ }
+
+ public void setEnableSplitMetadata(boolean enableSplitMetadata) {
+ this.enableSplitMetadata = enableSplitMetadata;
+ }
+
+ public int getSplitMetadataSize() {
+ return splitMetadataSize;
+ }
+
+ public void setSplitMetadataSize(int splitMetadataSize) {
+ this.splitMetadataSize = splitMetadataSize;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
index 7c159021aa..a1828d3d53 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
@@ -26,6 +26,8 @@ import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
public class SubscriptionGroupWrapper extends RemotingSerializable {
private ConcurrentMap<String, SubscriptionGroupConfig>
subscriptionGroupTable =
new ConcurrentHashMap<>(1024);
+ private ConcurrentMap<String, ConcurrentMap<String, Integer>>
forbiddenTable =
+ new ConcurrentHashMap<>(1024);
private DataVersion dataVersion = new DataVersion();
public ConcurrentMap<String, SubscriptionGroupConfig>
getSubscriptionGroupTable() {
@@ -37,6 +39,14 @@ public class SubscriptionGroupWrapper extends
RemotingSerializable {
this.subscriptionGroupTable = subscriptionGroupTable;
}
+ public ConcurrentMap<String, ConcurrentMap<String, Integer>>
getForbiddenTable() {
+ return forbiddenTable;
+ }
+
+ public void setForbiddenTable(ConcurrentMap<String, ConcurrentMap<String,
Integer>> forbiddenTable) {
+ this.forbiddenTable = forbiddenTable;
+ }
+
public DataVersion getDataVersion() {
return dataVersion;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
similarity index 59%
copy from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
index 566ce16fa4..6d67afdb9c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
@@ -15,22 +15,51 @@
* limitations under the License.
*/
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z
[email protected] $
- */
package org.apache.rocketmq.remoting.protocol.header;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource =
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, resource
= ResourceType.GROUP, action = Action.GET)
+public class GetAllSubscriptionGroupRequestHeader implements
CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
+ // nothing
+ }
+
+ @CFNotNull
+ private Integer groupSeq;
+
+ private String dataVersion;
+
+ private Integer maxGroupNum;
+
+ public Integer getGroupSeq() {
+ return groupSeq;
+ }
+
+ public void setGroupSeq(Integer groupSeq) {
+ this.groupSeq = groupSeq;
+ }
+
+ public String getDataVersion() {
+ return dataVersion;
+ }
+
+ public void setDataVersion(String dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+
+ public Integer getMaxGroupNum() {
+ return maxGroupNum;
+ }
+
+ public void setMaxGroupNum(Integer maxGroupNum) {
+ this.maxGroupNum = maxGroupNum;
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
similarity index 72%
copy from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
index 566ce16fa4..8f42a1b2a8 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
@@ -15,22 +15,31 @@
* limitations under the License.
*/
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z
[email protected] $
- */
package org.apache.rocketmq.remoting.protocol.header;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource =
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, resource
= ResourceType.GROUP, action = Action.LIST)
+public class GetAllSubscriptionGroupResponseHeader implements
CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
+
+ }
+
+ @CFNotNull
+ private Integer totalGroupNum;
+
+ public Integer getTotalGroupNum() {
+ return totalGroupNum;
+ }
+
+ public void setTotalGroupNum(Integer totalGroupNum) {
+ this.totalGroupNum = totalGroupNum;
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
similarity index 62%
copy from
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
index 566ce16fa4..769a814d34 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
@@ -15,22 +15,51 @@
* limitations under the License.
*/
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z
[email protected] $
- */
package org.apache.rocketmq.remoting.protocol.header;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource =
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource =
ResourceType.TOPIC, action = Action.GET)
+public class GetAllTopicConfigRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
+ // nothing
+ }
+
+ @CFNotNull
+ private Integer topicSeq;
+
+ private String dataVersion;
+
+ private Integer maxTopicNum;
+
+ public Integer getTopicSeq() {
+ return topicSeq;
+ }
+
+ public void setTopicSeq(Integer topicSeq) {
+ this.topicSeq = topicSeq;
+ }
+
+ public String getDataVersion() {
+ return dataVersion;
+ }
+
+ public void setDataVersion(String dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+
+ public Integer getMaxTopicNum() {
+ return maxTopicNum;
+ }
+
+ public void setMaxTopicNum(Integer maxTopicNum) {
+ this.maxTopicNum = maxTopicNum;
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
index 566ce16fa4..446ed03c47 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -33,4 +34,15 @@ public class GetAllTopicConfigResponseHeader implements
CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
}
+
+ @CFNotNull
+ private Integer totalTopicNum;
+
+ public Integer getTotalTopicNum() {
+ return totalTopicNum;
+ }
+
+ public void setTotalTopicNum(Integer totalTopicNum) {
+ this.totalTopicNum = totalTopicNum;
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 9780df13dd..2e72af13ee 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -652,23 +652,23 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
}
@Override
- public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException {
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr,
timeoutMillis);
}
@Override
- public SubscriptionGroupWrapper getUserSubscriptionGroup(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException {
+ public SubscriptionGroupWrapper getUserSubscriptionGroup(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr,
timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String
brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException {
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr,
timeoutMillis);
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1bdcc765d6..7b268cf694 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1636,14 +1636,16 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
}
@Override
- public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
return
this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr,
timeoutMillis);
}
@Override
- public SubscriptionGroupWrapper getUserSubscriptionGroup(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
+ public SubscriptionGroupWrapper getUserSubscriptionGroup(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
SubscriptionGroupWrapper subscriptionGroupWrapper =
this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr,
timeoutMillis);
Iterator<Entry<String, SubscriptionGroupConfig>> iterator =
subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet().iterator();
@@ -1658,8 +1660,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
@Override
- public TopicConfigSerializeWrapper getAllTopicConfig(final String
brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
+ public TopicConfigSerializeWrapper getAllTopicConfig(final String
brokerAddr, long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException, RemotingCommandException {
return
this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr,
timeoutMillis);
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index a10a58950d..46e2c066cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -316,16 +316,18 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws InterruptedException, MQBrokerException,
MQClientException, RemotingException;
SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException;
+ long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException, RemotingCommandException;
SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
- long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException;
+ long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException, RemotingCommandException;
TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException;
+ RemotingConnectException, MQBrokerException, RemotingCommandException;
TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr,
final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index ec5f7571d2..884764f853 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -491,7 +491,8 @@ public class DefaultMQAdminExtTest {
}
@Test
- public void testGetAllSubscriptionGroup() throws InterruptedException,
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
+ public void testGetAllSubscriptionGroup() throws InterruptedException,
MQBrokerException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
RemotingCommandException {
SubscriptionGroupWrapper subscriptionGroupWrapper =
defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");