This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7cb1b1ee86 [ISSUE #9467] Implement paged retrieval of Topic and 
SubscriptionGroup information (#9468)
7cb1b1ee86 is described below

commit 7cb1b1ee86e2facaf0eaf1b19d3c20b5df1522b6
Author: ltamber <[email protected]>
AuthorDate: Wed Jul 2 09:36:54 2025 +0800

    [ISSUE #9467] Implement paged retrieval of Topic and SubscriptionGroup 
information (#9468)
    
    * Implement paged retrieval of Topic and SubscriptionGroup information
    - Implemented pagination logic to support data retrieval by sequence number 
and maximum count.
    - Added data version checking to ensure the retrieved data is the latest.
    - Optimized the result structure to include total count and current page 
data.
    - Added unit tests
    
    * BrokerOuterAPI
    
    * add timeout & log
    
    ---------
    
    Co-authored-by: xiaoming.lt <[email protected]>
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 192 ++++++++++++++++++---
 .../broker/processor/AdminBrokerProcessor.java     | 117 ++++++++-----
 .../subscription/SubscriptionGroupManager.java     |  40 +++++
 .../rocketmq/broker/topic/TopicConfigManager.java  |  25 ++-
 .../broker/topic/TopicQueueMappingManager.java     |  17 ++
 .../broker/processor/AdminBrokerProcessorTest.java | 189 +++++++++++++++++++-
 .../broker/slave/SlaveSynchronizeAtomicTest.java   |   5 +-
 .../broker/slave/SlaveSynchronizeTest.java         |   4 +-
 .../subscription/SubscriptionGroupManagerTest.java |  80 ++++++++-
 .../broker/topic/TopicConfigManagerTest.java       |  74 ++++++++
 .../org/apache/rocketmq/client/ClientConfig.java   |  10 ++
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 176 ++++++++++++++++---
 .../org/apache/rocketmq/common/BrokerConfig.java   |  19 ++
 .../protocol/body/SubscriptionGroupWrapper.java    |  10 ++
 ...a => GetAllSubscriptionGroupRequestHeader.java} |  41 ++++-
 ... => GetAllSubscriptionGroupResponseHeader.java} |  21 ++-
 ...er.java => GetAllTopicConfigRequestHeader.java} |  41 ++++-
 .../header/GetAllTopicConfigResponseHeader.java    |  12 ++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  14 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  15 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  12 +-
 .../tools/admin/DefaultMQAdminExtTest.java         |   3 +-
 22 files changed, 982 insertions(+), 135 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 83edd88408..21ba349c84 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -23,9 +23,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -102,6 +107,10 @@ import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetBrokerMemberGroupRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
@@ -139,6 +148,8 @@ import 
org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.rpc.ClientMetadata;
 import org.apache.rocketmq.remoting.rpc.RpcClient;
 import org.apache.rocketmq.remoting.rpc.RpcClientImpl;
@@ -761,22 +772,86 @@ public class BrokerOuterAPI {
         return changedList;
     }
 
-    public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(
-        final String addr) throws RemotingConnectException, 
RemotingSendRequestException,
-        RemotingTimeoutException, InterruptedException, MQBrokerException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+    public TopicConfigAndMappingSerializeWrapper getAllTopicConfig(final 
String addr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingCommandException {
+
+        DataVersion topicConfigDataVersion = null;
+        DataVersion mappingDataVersion = null;
+        long timeoutMills = getTimeoutMillis();
+        int topicSeq = 0;
+        long beginTime = System.nanoTime();
+        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        Map<String, TopicQueueMappingDetail> topicQueueMappingDetailMap = new 
ConcurrentHashMap<>();
+        while (true) {
+            long leftTime = timeoutMills - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+            if (leftTime < 0) {
+                throw new RemotingTimeoutException("invokeSync call timeout");
+            }
 
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 
3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return TopicConfigSerializeWrapper.decode(response.getBody(), 
TopicConfigAndMappingSerializeWrapper.class);
+            GetAllTopicConfigRequestHeader requestHeader = new 
GetAllTopicConfigRequestHeader();
+            requestHeader.setTopicSeq(topicSeq);
+            requestHeader.setMaxTopicNum(getMaxPageSize());
+            
requestHeader.setDataVersion(Optional.ofNullable(topicConfigDataVersion).
+                map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            LOGGER.info("getAllTopicConfig from seq {}, max {}, dataVersion 
{}",
+                    topicSeq, requestHeader.getMaxTopicNum(), 
requestHeader.getDataVersion());
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, 
requestHeader);
+
+            RemotingCommand response = this.remotingClient.invokeSync(
+                MixAll.brokerVIPChannel(true, addr), request, 30000);
+
+            assert response != null;
+            if (response.getCode() == SUCCESS) {
+                TopicConfigAndMappingSerializeWrapper 
topicConfigSerializeWrapper =
+                    
TopicConfigAndMappingSerializeWrapper.decode(response.getBody(), 
TopicConfigAndMappingSerializeWrapper.class);
+                
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+                
topicQueueMappingDetailMap.putAll(topicConfigSerializeWrapper.getTopicQueueMappingDetailMap());
+                topicSeq += 
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+
+                DataVersion newDataVersion = 
topicConfigSerializeWrapper.getDataVersion();
+                if (topicConfigDataVersion == null) {
+                    // fill dataVersion before break the loop to compatible 
with old version server
+                    topicConfigDataVersion = newDataVersion;
+                    mappingDataVersion = 
topicConfigSerializeWrapper.getMappingDataVersion();
+                }
+
+                GetAllTopicConfigResponseHeader responseHeader =
+                    
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
+                Integer totalTopicNum = Optional.ofNullable(responseHeader)
+                    
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
+
+                if (Objects.isNull(totalTopicNum)) {       // compatible with 
old version server
+                    // the server side don't support totalTopicNum, all data 
is returned
+                    break;
+                }
+
+                if (!Objects.equals(topicConfigDataVersion, newDataVersion)) {
+                    LOGGER.error("dataVersion changed, currentDataVersion: {}, 
newDataVersion: {}", topicConfigDataVersion, newDataVersion);
+                    topicConfigDataVersion = newDataVersion;
+                    mappingDataVersion = 
topicConfigSerializeWrapper.getMappingDataVersion();
+                    topicSeq = 0;
+                    topicConfigTable.clear();
+                    continue;
+                }
+
+                if (topicSeq >= totalTopicNum - 1) {
+                    LOGGER.info("get all topic config, totalTopicNum: {}", 
totalTopicNum);
+                    break;
+                }
+            } else {
+                throw new MQBrokerException(response.getCode(), 
response.getRemark(), addr);
             }
-            default:
-                break;
+
         }
 
-        throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
+        TopicConfigAndMappingSerializeWrapper topicConfigSerializeWrapper = 
new TopicConfigAndMappingSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(topicConfigDataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+        topicConfigSerializeWrapper.setMappingDataVersion(mappingDataVersion);
+        
topicConfigSerializeWrapper.setTopicQueueMappingDetailMap(topicQueueMappingDetailMap);
+        return topicConfigSerializeWrapper;
     }
 
     public TimerCheckpoint getTimerCheckPoint(
@@ -849,21 +924,82 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
     }
 
-    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
-        final String addr) throws InterruptedException, 
RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return SubscriptionGroupWrapper.decode(response.getBody(), 
SubscriptionGroupWrapper.class);
+    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String 
addr)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
+
+        long timeoutMills = getTimeoutMillis();
+        DataVersion currentDataVersion = null;
+        int groupSeq = 0;
+        long beginTime = System.nanoTime();
+        ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable 
= new ConcurrentHashMap<>();
+        ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = 
new ConcurrentHashMap<>();
+
+        while (true) {
+            long leftTime = timeoutMills - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+            if (leftTime < 0) {
+                throw new RemotingTimeoutException("invokeSync call timeout");
+            }
+
+            GetAllSubscriptionGroupRequestHeader requestHeader = new 
GetAllSubscriptionGroupRequestHeader();
+            requestHeader.setGroupSeq(groupSeq);
+            requestHeader.setMaxGroupNum(getMaxPageSize());
+            
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
+                .map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            LOGGER.info("getAllSubscriptionGroup from seq {}, max {}, 
dataVersion {}",
+                    groupSeq, requestHeader.getMaxGroupNum(), 
requestHeader.getDataVersion());
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 requestHeader);
+            RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, 30000);
+
+            assert response != null;
+            if (response.getCode() == SUCCESS) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper =
+                    SubscriptionGroupWrapper.decode(response.getBody(), 
SubscriptionGroupWrapper.class);
+                
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+                
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+
+                DataVersion newDataVersion = 
subscriptionGroupWrapper.getDataVersion();
+                if (currentDataVersion == null) {
+                    // fill dataVersion before break the loop to compatible 
with old version server
+                    currentDataVersion = newDataVersion;
+                }
+
+                groupSeq += 
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+
+                GetAllSubscriptionGroupResponseHeader responseHeader =
+                    
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
+                Integer totalGroupNum = Optional.ofNullable(responseHeader)
+                    
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
+
+                if (Objects.isNull(totalGroupNum)) {
+                    // the server side don't support totalGroupNum, all data 
is returned
+                    break;
+                }
+
+                if (!Objects.equals(currentDataVersion, newDataVersion)) {
+                    LOGGER.error("dataVersion changed, currentDataVersion: {}, 
newDataVersion: {}",
+                        currentDataVersion, newDataVersion);
+                    currentDataVersion = newDataVersion;
+                    groupSeq = 0;
+                    subscriptionGroupTable.clear();
+                    forbiddenTable.clear();
+                    continue;
+                }
+
+                if (groupSeq >= totalGroupNum - 1) {
+                    LOGGER.info("get all subscription group config, 
totalGroupNum: {}", totalGroupNum);
+                    break;
+                }
+            } else {
+                throw new MQBrokerException(response.getCode(), 
response.getRemark(), addr);
             }
-            default:
-                break;
         }
 
-        throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
+        SubscriptionGroupWrapper allSubscriptionGroup = new 
SubscriptionGroupWrapper();
+        allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
+        allSubscriptionGroup.setForbiddenTable(forbiddenTable);
+        allSubscriptionGroup.setDataVersion(currentDataVersion);
+        return allSubscriptionGroup;
     }
 
     public void registerRPCHook(RPCHook rpcHook) {
@@ -1491,4 +1627,12 @@ public class BrokerOuterAPI {
         return pullResult;
     }
 
+    private int getMaxPageSize() {
+        return 2000;
+    }
+
+    private long getTimeoutMillis() {
+        return TimeUnit.SECONDS.toMillis(60);
+    }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index fd8d925e3b..4203b3af82 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,7 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.auth.authentication.enums.UserType;
 import 
org.apache.rocketmq.auth.authentication.exception.AuthenticationException;
@@ -68,6 +70,8 @@ import 
org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.broker.metrics.InvocationStatus;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
@@ -137,6 +141,7 @@ import 
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
 import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
 import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupList;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import 
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.TopicList;
@@ -157,6 +162,9 @@ import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader
 import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetBrokerConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
@@ -816,40 +824,54 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, 
RemotingCommand request) {
+    private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, 
RemotingCommand request)
+        throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
-        // final GetAllTopicConfigResponseHeader responseHeader =
-        // (GetAllTopicConfigResponseHeader) response.readCustomHeader();
+        final GetAllTopicConfigResponseHeader responseHeader =
+            (GetAllTopicConfigResponseHeader) response.readCustomHeader();
+        final GetAllTopicConfigRequestHeader requestHeader =
+            
request.decodeCommandCustomHeader(GetAllTopicConfigRequestHeader.class);
+
+        String dataVersionStr = requestHeader.getDataVersion();
+        Integer topicSeq = requestHeader.getTopicSeq();
+        Integer maxTopicNum = requestHeader.getMaxTopicNum();
+
+        TopicConfigManager tcManager = 
brokerController.getTopicConfigManager();
+        TopicQueueMappingManager tqmManager = 
brokerController.getTopicQueueMappingManager();
 
         TopicConfigAndMappingSerializeWrapper 
topicConfigAndMappingSerializeWrapper = new 
TopicConfigAndMappingSerializeWrapper();
+        if (!brokerController.getBrokerConfig().isEnableSplitMetadata()
+            || ObjectUtils.allNull(dataVersionStr, topicSeq, maxTopicNum)) {  
// old client, return all topic config
 
-        
topicConfigAndMappingSerializeWrapper.setDataVersion(this.brokerController.getTopicConfigManager().getDataVersion());
-        
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(this.brokerController.getTopicConfigManager().getTopicConfigTable());
+            
topicConfigAndMappingSerializeWrapper.setDataVersion(tcManager.getDataVersion());
+            
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(tcManager.getTopicConfigTable());
 
-        
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(this.brokerController.getTopicQueueMappingManager().getDataVersion());
-        
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(this.brokerController.getTopicQueueMappingManager().getTopicQueueMappingTable());
+            
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(tqmManager.getDataVersion());
+            
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(tqmManager.getTopicQueueMappingTable());
+        } else {
+            int topicNum = 
Math.min(brokerController.getBrokerConfig().getSplitMetadataSize(),
+                Optional.ofNullable(maxTopicNum).orElse(Integer.MAX_VALUE));  
// use smaller value
+            ConcurrentHashMap<String, TopicConfig> subTopicConfigTable =
+                tcManager.subTopicConfigTable(dataVersionStr, topicSeq, 
topicNum);
+            
topicConfigAndMappingSerializeWrapper.setTopicConfigTable(subTopicConfigTable);
+            
topicConfigAndMappingSerializeWrapper.setDataVersion(tcManager.getDataVersion());
 
-        String content = topicConfigAndMappingSerializeWrapper.toJson();
-        if (content != null && content.length() > 0) {
-            try {
-                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
-            } catch (UnsupportedEncodingException e) {
-                LOGGER.error("", e);
+            
topicConfigAndMappingSerializeWrapper.setMappingDataVersion(tqmManager.getDataVersion());
+            
topicConfigAndMappingSerializeWrapper.setTopicQueueMappingDetailMap(
+                
tqmManager.subTopicQueueMappingTable(subTopicConfigTable.keySet()));
+        }
 
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("UnsupportedEncodingException " + 
e.getMessage());
-                return response;
-            }
+        
responseHeader.setTotalTopicNum(tcManager.getTopicConfigTable().size());
+        String content = topicConfigAndMappingSerializeWrapper.toJson();
+        if (StringUtils.isNotBlank(content)) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            response.setBody(content.getBytes(StandardCharsets.UTF_8));
         } else {
             LOGGER.error("No topic in this broker, client: {}", 
ctx.channel().remoteAddress());
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("No topic in this broker");
-            return response;
         }
-
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-
         return response;
     }
 
@@ -1584,28 +1606,45 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
     private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-        String content = 
this.brokerController.getSubscriptionGroupManager().encode();
-        if (content != null && content.length() > 0) {
-            try {
-                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
-            } catch (UnsupportedEncodingException e) {
-                LOGGER.error("UnsupportedEncodingException 
getAllSubscriptionGroup", e);
-
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("UnsupportedEncodingException " + 
e.getMessage());
-                return response;
-            }
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetAllSubscriptionGroupResponseHeader.class);
+        final GetAllSubscriptionGroupResponseHeader responseHeader =
+            (GetAllSubscriptionGroupResponseHeader) 
response.readCustomHeader();
+        final GetAllSubscriptionGroupRequestHeader requestHeader =
+            
request.decodeCommandCustomHeader(GetAllSubscriptionGroupRequestHeader.class);
+
+        String dataVersionStr = requestHeader.getDataVersion();
+        Integer groupSeq = requestHeader.getGroupSeq();
+        Integer maxGroupNum = requestHeader.getMaxGroupNum();
+
+        SubscriptionGroupManager sgManager = 
this.brokerController.getSubscriptionGroupManager();
+
+        SubscriptionGroupWrapper subscriptionGroupWrapper = new 
SubscriptionGroupWrapper();
+        if (!brokerController.getBrokerConfig().isEnableSplitMetadata()
+            || ObjectUtils.allNull(dataVersionStr, groupSeq, maxGroupNum)) {
+            
subscriptionGroupWrapper.setSubscriptionGroupTable(sgManager.getSubscriptionGroupTable());
+            
subscriptionGroupWrapper.setForbiddenTable(sgManager.getForbiddenTable());
+            
subscriptionGroupWrapper.setDataVersion(sgManager.getDataVersion());
+        } else {
+            int groupNum = 
Math.min(brokerController.getBrokerConfig().getSplitMetadataSize(),
+                Optional.ofNullable(maxGroupNum).orElse(Integer.MAX_VALUE));
+            ConcurrentMap<String, SubscriptionGroupConfig> subGroupTable =
+                sgManager.subGroupTable(dataVersionStr, groupSeq, groupNum);
+            subscriptionGroupWrapper.setSubscriptionGroupTable(subGroupTable);
+            
subscriptionGroupWrapper.setDataVersion(sgManager.getDataVersion());
+            
subscriptionGroupWrapper.setForbiddenTable(sgManager.subForbiddenTable(subGroupTable.keySet()));
+        }
+
+        
responseHeader.setTotalGroupNum(sgManager.getSubscriptionGroupTable().size());
+        String content = subscriptionGroupWrapper.toJson();
+        if (StringUtils.isNotBlank(content)) {
+            response.setBody(content.getBytes(StandardCharsets.UTF_8));
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
         } else {
             LOGGER.error("No subscription group in this broker, client:{} ", 
ctx.channel().remoteAddress());
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("No subscription group in this broker");
-            return response;
         }
-
-        response.setCode(ResponseCode.SUCCESS);
-        response.setRemark(null);
-
         return response;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index f3e669fb3e..c7083365be 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -22,8 +22,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -322,10 +330,42 @@ public class SubscriptionGroupManager extends 
ConfigManager {
         return subscriptionGroupTable;
     }
 
+    public ConcurrentHashMap<String, SubscriptionGroupConfig> 
subGroupTable(String dataVersion, int groupSeq,
+        int maxGroupNum) {
+        // [groupSeq, groupSeq + maxGroupNum)
+        int beginIndex = groupSeq;
+        if (StringUtils.isBlank(dataVersion) || 
!Objects.equals(DataVersion.fromJson(dataVersion, DataVersion.class), 
this.dataVersion)) {
+            beginIndex = 0;
+            log.info("get sub subscription group table from {} due to {}", 
beginIndex,
+                StringUtils.isBlank(dataVersion) ? "DataVersion Empty" : 
"DataVersion Changed");
+        }
+
+        ConcurrentHashMap<String, SubscriptionGroupConfig> subGroupTable = new 
ConcurrentHashMap<>();
+        if (beginIndex < subscriptionGroupTable.size()) {
+            int endIndex = Math.min(beginIndex + maxGroupNum, 
subscriptionGroupTable.size());
+
+            ImmutableSortedMap<String, SubscriptionGroupConfig> sortedMap = 
ImmutableSortedMap.copyOf(subscriptionGroupTable);
+            
subGroupTable.putAll(sortedMap.subMap(sortedMap.keySet().asList().get(beginIndex),true,
+                sortedMap.keySet().asList().get(endIndex - 1),true));
+        }
+
+        return subGroupTable;
+    }
+
     public ConcurrentMap<String, ConcurrentMap<String, Integer>> 
getForbiddenTable() {
         return forbiddenTable;
     }
 
+    public ConcurrentMap<String, ConcurrentMap<String, Integer>> 
subForbiddenTable(Set<String> groupSet) {
+        if (MapUtils.isEmpty(forbiddenTable) || 
CollectionUtils.isEmpty(groupSet)) {
+            return Maps.newConcurrentMap();
+        }
+
+        return forbiddenTable.entrySet().stream()
+            .filter(e -> groupSet.contains(e.getKey()))
+            .collect(Collectors.toConcurrentMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
     public void setForbiddenTable(
         ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable) {
         this.forbiddenTable = forbiddenTable;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index b20cafc101..8211a8fb09 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,7 +30,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.collect.ImmutableMap;
-
+import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
@@ -703,6 +704,28 @@ public class TopicConfigManager extends ConfigManager {
         return topicConfigTable;
     }
 
+    public ConcurrentHashMap<String, TopicConfig> subTopicConfigTable(String 
dataVersion, int topicSeq,
+        int maxTopicNum) {
+        // [topicSeq, topicSeq + maxTopicNum)
+        int beginIndex = topicSeq;
+        if (StringUtils.isBlank(dataVersion) || 
!Objects.equals(DataVersion.fromJson(dataVersion, DataVersion.class), 
this.dataVersion)) {
+            beginIndex = 0;
+            log.info("get sub topic config table from {} due to {}", 
beginIndex,
+                StringUtils.isBlank(dataVersion) ? "DataVersion Empty" : 
"DataVersion Changed");
+        }
+
+        ConcurrentHashMap<String, TopicConfig> subTopicConfigTable = new 
ConcurrentHashMap<>();
+        if (beginIndex < topicConfigTable.size()) {
+            int endIndex = Math.min(beginIndex + maxTopicNum, 
topicConfigTable.size());
+
+            ImmutableSortedMap<String, TopicConfig> sortedMap = 
ImmutableSortedMap.copyOf(topicConfigTable);
+            
subTopicConfigTable.putAll(sortedMap.subMap(sortedMap.keySet().asList().get(beginIndex),true,
+                sortedMap.keySet().asList().get(endIndex - 1),true));
+        }
+
+        return subTopicConfigTable;
+    }
+
     private Map<String, String> request(TopicConfig topicConfig) {
         return topicConfig.getAttributes() == null ? new HashMap<>() : 
topicConfig.getAttributes();
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 6b9cf15938..4b0714decb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,12 +17,19 @@
 package org.apache.rocketmq.broker.topic;
 
 import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Maps;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -178,6 +185,16 @@ public class TopicQueueMappingManager extends 
ConfigManager {
         return topicQueueMappingTable;
     }
 
+    public ConcurrentMap<String, TopicQueueMappingDetail> 
subTopicQueueMappingTable(Set<String> topicSet) {
+        if (MapUtils.isEmpty(topicQueueMappingTable) || 
CollectionUtils.isEmpty(topicSet)) {
+            return Maps.newConcurrentMap();
+        }
+
+        return topicQueueMappingTable.entrySet().stream()
+                .filter(e -> topicSet.contains(e.getKey()))
+                .collect(Collectors.toConcurrentMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
     public DataVersion getDataVersion() {
         return dataVersion;
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index a6bcca954d..f3d0eb0782 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -17,9 +17,12 @@
 package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.auth.authentication.enums.UserType;
 import 
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
 import org.apache.rocketmq.auth.authentication.model.Subject;
@@ -61,6 +64,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
@@ -71,6 +75,8 @@ import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
 import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
@@ -81,6 +87,9 @@ import 
org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
@@ -137,11 +146,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -455,12 +468,166 @@ public class AdminBrokerProcessorTest {
 
     @Test
     public void testGetAllTopicConfig() throws Exception {
-        GetAllTopicConfigResponseHeader getAllTopicConfigResponseHeader = new 
GetAllTopicConfigResponseHeader();
+        GetAllTopicConfigRequestHeader getAllTopicConfigResponseHeader = new 
GetAllTopicConfigRequestHeader();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, 
getAllTopicConfigResponseHeader);
         RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+
+    private void getAllTopicConfig(boolean enableSplitMetadata) throws 
RemotingCommandException {
+        
brokerController.getBrokerConfig().setEnableSplitMetadata(enableSplitMetadata);
+
+        // old client, request null
+        RemotingCommand requestOldClient = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+        RemotingCommand responseOldClient = 
adminBrokerProcessor.processRequest(handlerContext, requestOldClient);
+        
assertThat(responseOldClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapperOldClient =
+            TopicConfigSerializeWrapper.decode(responseOldClient.getBody(), 
TopicConfigSerializeWrapper.class);
+        
assertThat(Maps.difference(topicConfigSerializeWrapperOldClient.getTopicConfigTable(),
+            
brokerController.getTopicConfigManager().getTopicConfigTable()).areEqual()).isTrue();
+
+        // new client, request seq from 0
+        AtomicBoolean dataVersionChanged = new AtomicBoolean(false);
+        int topicSeq = 0;
+        DataVersion dataVersion = null;
+        int pageSize = ThreadLocalRandom.current().nextInt(500, 
brokerController.getBrokerConfig().getSplitMetadataSize());
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        while (true) {
+            GetAllTopicConfigRequestHeader requestHeader = new 
GetAllTopicConfigRequestHeader();
+            requestHeader.setTopicSeq(topicSeq);
+            requestHeader.setMaxTopicNum(pageSize);
+            
requestHeader.setDataVersion(Optional.ofNullable(dataVersion).map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            RemotingCommand requestNewClient = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, 
requestHeader);
+            requestNewClient.makeCustomHeaderToNet();
+
+            RemotingCommand responseNewClient = 
adminBrokerProcessor.processRequest(handlerContext, requestNewClient);
+            GetAllTopicConfigResponseHeader responseHeader = 
(GetAllTopicConfigResponseHeader) responseNewClient.readCustomHeader();
+            
assertThat(responseNewClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+            TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                
TopicConfigSerializeWrapper.decode(responseNewClient.getBody(), 
TopicConfigSerializeWrapper.class);
+            topicSeq += 
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+            assertThat(responseHeader.getTotalTopicNum())
+                
.isEqualTo(brokerController.getTopicConfigManager().getTopicConfigTable().size());
+            assertThat(topicConfigSerializeWrapper.getDataVersion())
+                
.isEqualTo(brokerController.getTopicConfigManager().getDataVersion());
+
+            DataVersion newDataVersion = 
topicConfigSerializeWrapper.getDataVersion();
+            if (dataVersion == null) {
+                dataVersion = newDataVersion;
+            }
+
+            // mock server side data version changed
+            if (topicSeq > responseHeader.getTotalTopicNum() / 2 && 
dataVersionChanged.compareAndSet(false, true)) {
+                
brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+            }
+
+            if (!Objects.equals(dataVersion, newDataVersion)) {
+                dataVersion = newDataVersion;
+                topicSeq = 0;   // data version diff, from 0
+                topicConfigTable.clear();
+                continue;
+            }
+
+
+            
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+            if (topicSeq >= responseHeader.getTotalTopicNum() - 1) {
+                break;
+            } else {
+                
assertThat(topicConfigSerializeWrapper.getTopicConfigTable().size()).isEqualTo(pageSize);
+            }
+        }
+        assertThat(Maps.difference(topicConfigTable, 
brokerController.getTopicConfigManager().getTopicConfigTable()).areEqual()).isTrue();
+    }
+
+    @Test
+    public void testGetAllTopicConfigWithRequestHeader() throws 
RemotingCommandException {
+        // from [0, 50000)
+        fillTopicConfigTable(50000);
+
+        getAllTopicConfig(true);
+        getAllTopicConfig(false);   // broker side disable split , will return 
all topic config
+    }
+
+
+    private void getAllSubscriptionGroup(boolean enableSplitMetadata) throws 
RemotingCommandException {
+        
brokerController.getBrokerConfig().setEnableSplitMetadata(enableSplitMetadata);
+
+        // old client, request null
+        RemotingCommand requestOldClient = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
+        RemotingCommand responseOldClient = 
adminBrokerProcessor.processRequest(handlerContext, requestOldClient);
+        
assertThat(responseOldClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        // new client, request from 0
+        AtomicBoolean dataVersionChanged = new AtomicBoolean(false);
+        int groupSeq = 0;
+        int pageSize = ThreadLocalRandom.current().nextInt(500, 
brokerController.getBrokerConfig().getSplitMetadataSize());
+        DataVersion dataVersion = null;
+        ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable 
= new ConcurrentHashMap<>();
+        ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = 
new ConcurrentHashMap<>();
+        while (true) {
+            GetAllSubscriptionGroupRequestHeader requestHeader = new 
GetAllSubscriptionGroupRequestHeader();
+            requestHeader.setGroupSeq(groupSeq);
+            requestHeader.setMaxGroupNum(pageSize);
+            
requestHeader.setDataVersion(Optional.ofNullable(dataVersion).map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            RemotingCommand requestNewClient = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 requestHeader);
+            requestNewClient.makeCustomHeaderToNet();
+            RemotingCommand responseNewClient = 
adminBrokerProcessor.processRequest(handlerContext, requestNewClient);
+            GetAllSubscriptionGroupResponseHeader responseHeader = 
(GetAllSubscriptionGroupResponseHeader) responseNewClient.readCustomHeader();
+            
assertThat(responseNewClient.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+            SubscriptionGroupWrapper subscriptionGroupWrapper =
+                SubscriptionGroupWrapper.decode(responseNewClient.getBody(), 
SubscriptionGroupWrapper.class);
+
+            groupSeq += 
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+            DataVersion newDataVersion = 
subscriptionGroupWrapper.getDataVersion();
+
+            assertThat(responseHeader.getTotalGroupNum()).isEqualTo(
+                
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().size());
+            
assertThat(newDataVersion).isEqualTo(brokerController.getSubscriptionGroupManager().getDataVersion());
+
+            if (dataVersion == null) {
+                dataVersion = newDataVersion;
+            }
+
+
+            // mock server side data version changed
+            if (groupSeq > responseHeader.getTotalGroupNum() / 2 && 
dataVersionChanged.compareAndSet(false, true)) {
+                
brokerController.getSubscriptionGroupManager().getDataVersion().nextVersion();
+            }
+
+            if (!Objects.equals(dataVersion, newDataVersion)) {
+                dataVersion = newDataVersion;
+                groupSeq = 0;   // data version diff, from 0
+                subscriptionGroupTable.clear();
+                forbiddenTable.clear();
+                continue;
+            }
+
+            
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+            
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+            if (groupSeq >= responseHeader.getTotalGroupNum() - 1) {
+                break;
+            } else {
+                
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().size()).isEqualTo(pageSize);
+            }
+        }
+        assertThat(Maps.difference(subscriptionGroupTable, 
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable()).areEqual()).isTrue();
+        assertThat(Maps.difference(forbiddenTable, 
brokerController.getSubscriptionGroupManager().getForbiddenTable()).areEqual()).isTrue();
+    }
+
+    @Test
+    public void testGetAllSubscriptionGroupWithRequestHeader() throws 
RemotingCommandException {
+        fillSubscriptionGroupManager(50000);
+
+        getAllSubscriptionGroup(true);
+        getAllSubscriptionGroup(false);
+
+    }
+
     @Test
     public void testUpdateBrokerConfig() throws Exception {
         handlerContext = mock(ChannelHandlerContext.class);
@@ -1434,4 +1601,24 @@ public class AdminBrokerProcessorTest {
     private boolean notToBeExecuted() {
         return MixAll.isMac();
     }
+
+    private void fillTopicConfigTable(int num) {
+        for (int i = num - 1; i >= 0; i--) {
+            String topicName = String.format("topic%05d", i);
+            TopicConfig topicConfig = new TopicConfig(topicName, 1, 1,
+                PermName.PERM_READ | PermName.PERM_WRITE, 0);
+            
brokerController.getTopicConfigManager().getTopicConfigTable().put(topicName, 
topicConfig);
+        }
+    }
+
+    private void fillSubscriptionGroupManager(int num) {
+        for (int i = num - 1; i >= 0; i--) {
+            SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+            String groupName = String.format("group-%05d", i);
+            subscriptionGroupConfig.setGroupName(groupName);
+            Map<String, String> attr = ImmutableMap.of("+test", "true");
+            subscriptionGroupConfig.setAttributes(attr);
+            
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put(groupName,
 subscriptionGroupConfig);
+        }
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
index 75db22e7e7..714dcd967f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
@@ -29,6 +29,7 @@ import 
org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -113,8 +114,8 @@ public class SlaveSynchronizeAtomicTest {
 
     @Test
     public void testSyncAtomically()
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException,
-            InterruptedException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, MQBrokerException,
+        InterruptedException, RemotingCommandException {
         
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
         
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
index 95db733d0d..c9461c4224 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -132,7 +133,8 @@ public class SlaveSynchronizeTest {
     }
 
     @Test
-    public void testSyncAll() throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, 
InterruptedException, UnsupportedEncodingException {
+    public void testSyncAll() throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
+        MQBrokerException, InterruptedException, UnsupportedEncodingException, 
RemotingCommandException {
         TopicConfig newTopicConfig = new TopicConfig("NewTopic");
         
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
         
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
index 3384d479c6..3c975a599b 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -24,13 +24,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.SubscriptionGroupAttributes;
 import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -45,7 +50,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-
 @RunWith(MockitoJUnitRunner.class)
 public class SubscriptionGroupManagerTest {
     private String group = "group";
@@ -58,9 +62,6 @@ public class SubscriptionGroupManagerTest {
 
     @Before
     public void before() {
-        if (notToBeExecuted()) {
-            return;
-        }
         SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
             "test",
             false,
@@ -166,4 +167,75 @@ public class SubscriptionGroupManagerTest {
 
     }
 
+    @Test
+    public void testSubGroupTable() {
+        // Empty SubscriptionGroupManager
+        subscriptionGroupManager.getSubscriptionGroupTable().clear();
+        Map<String, SubscriptionGroupConfig> result =
+            
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
 0, 200);
+        assertThat(result).isEmpty();
+
+        // fill SubscriptionGroupManager
+        int totalGroupNum = 50000;
+        fillSubscriptionGroupManager(totalGroupNum);
+
+        // Null DataVersion
+        int beginIndex = 0, maxNum = 200;
+        int endIndex = beginIndex + maxNum - 1;
+        result = subscriptionGroupManager.subGroupTable(null, beginIndex, 
maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("group-%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
endIndex + 1)));
+
+        // Different DataVersion
+        DataVersion differentVersion = new DataVersion();
+        differentVersion.setCounter(new AtomicLong(1000L));  // different 
counter
+        differentVersion.setTimestamp(System.currentTimeMillis());
+        result = 
subscriptionGroupManager.subGroupTable(differentVersion.toJson(), 300, maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("group-%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
endIndex + 1)));
+
+        // BeginIndexOutOfRange
+        result = 
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
 totalGroupNum, 200);
+
+        Assert.assertTrue(result.isEmpty());
+
+        // Normal Case
+        beginIndex = 300;
+        endIndex = beginIndex + maxNum - 1;
+        result = 
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
 beginIndex, maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("group-%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
endIndex + 1)));
+
+        // NotFullTopicConfigTable
+        beginIndex = 49950;
+        endIndex = 
Math.min(subscriptionGroupManager.getSubscriptionGroupTable().size() - 1, 
beginIndex + maxNum - 1);
+        result = 
subscriptionGroupManager.subGroupTable(subscriptionGroupManager.getDataVersion().toJson(),
 beginIndex, maxNum);
+
+        Assert.assertEquals(totalGroupNum - beginIndex, result.size());
+        Assert.assertTrue(result.containsKey(String.format("group-%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("group-%05d", 
endIndex + 1)));
+
+    }
+
+    private void fillSubscriptionGroupManager(int num) {
+        for (int i = num - 1; i >= 0; i--) {
+            SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+            String groupName = String.format("group-%05d", i);
+            subscriptionGroupConfig.setGroupName(groupName);
+            Map<String, String> attr = ImmutableMap.of("+test", "true");
+            subscriptionGroupConfig.setAttributes(attr);
+            
subscriptionGroupManager.getSubscriptionGroupTable().put(groupName, 
subscriptionGroupConfig);
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 3fd1d14c3a..5b2ea0b4d5 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -32,7 +35,9 @@ import org.apache.rocketmq.common.attribute.BooleanAttribute;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.attribute.EnumAttribute;
 import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Assert;
@@ -326,4 +331,73 @@ public class TopicConfigManagerTest {
 
         TopicAttributes.ALL.putAll(supportedAttributes);
     }
+
+    private void fillTopicConfigTable(int num) {
+        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        for (int i = num - 1; i >= 0; i--) {
+            String topicName = String.format("topic%05d", i);
+            TopicConfig topicConfig = new TopicConfig(topicName, 1, 1,
+                PermName.PERM_READ | PermName.PERM_WRITE, 0);
+            topicConfigTable.put(topicName, topicConfig);
+        }
+        topicConfigManager.setTopicConfigTable(topicConfigTable);
+    }
+
+    @Test
+    public void testSubTopicConfigTable() {
+        // Empty TopicConfigTable
+        topicConfigManager.getTopicConfigTable().clear();
+        Map<String, TopicConfig> result = 
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
 0, 200);
+        Assert.assertTrue(result.isEmpty());
+
+        // init table, topic range [0, N)
+        final int totalTopicNum = 50000;
+        fillTopicConfigTable(totalTopicNum);
+
+        // Null DataVersion
+        int beginIndex = 0, maxNum = 200;
+        int endIndex = beginIndex + maxNum - 1;
+        result = topicConfigManager.subTopicConfigTable(null, beginIndex, 
maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("topic%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
endIndex + 1)));
+
+        // Different DataVersion
+        DataVersion differentVersion = new DataVersion();
+        differentVersion.setCounter(new AtomicLong(1000L));  // different 
counter
+        differentVersion.setTimestamp(System.currentTimeMillis());
+        result = 
topicConfigManager.subTopicConfigTable(differentVersion.toJson(), 300, maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("topic%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
endIndex + 1)));
+
+        // BeginIndexOutOfRange
+        result = 
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
 totalTopicNum, 200);
+
+        Assert.assertTrue(result.isEmpty());
+
+        // Normal Case
+        beginIndex = 300;
+        endIndex = beginIndex + maxNum - 1;
+        result = 
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
 beginIndex, maxNum);
+
+        Assert.assertEquals(maxNum, result.size());
+        Assert.assertTrue(result.containsKey(String.format("topic%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
endIndex + 1)));
+
+        // NotFullTopicConfigTable
+        beginIndex = 49950;
+        endIndex = Math.min(topicConfigManager.getTopicConfigTable().size() - 
1, beginIndex + maxNum - 1);
+        result = 
topicConfigManager.subTopicConfigTable(topicConfigManager.getDataVersion().toJson(),
 beginIndex, maxNum);
+
+        Assert.assertEquals(totalTopicNum - beginIndex, result.size());
+        Assert.assertTrue(result.containsKey(String.format("topic%05d", 
ThreadLocalRandom.current().nextInt(beginIndex, endIndex))));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
beginIndex - 1)));
+        Assert.assertFalse(result.containsKey(String.format("topic%05d", 
endIndex + 1)));
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java 
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 696b073b37..79cb04af1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -111,6 +111,8 @@ public class ClientConfig {
      */
     protected String traceTopic;
 
+    protected int maxPageSizeInGetMetadata = 2000;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -515,6 +517,14 @@ public class ClientConfig {
         this.traceTopic = traceTopic;
     }
 
+    public int getMaxPageSizeInGetMetadata() {
+        return maxPageSizeInGetMetadata;
+    }
+
+    public void setMaxPageSizeInGetMetadata(int maxPageSizeInGetMetadata) {
+        this.maxPageSizeInGetMetadata = maxPageSizeInGetMetadata;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig{" +
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c001b33fa9..95bb0e8a96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -85,6 +85,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -151,6 +152,10 @@ import 
org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonReq
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllProducerInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllSubscriptionGroupResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsInBrokerHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
@@ -242,8 +247,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
@@ -2806,21 +2816,81 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException,
-        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
-        RemotingCommand response = this.remotingClient
-            
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return SubscriptionGroupWrapper.decode(response.getBody(), 
SubscriptionGroupWrapper.class);
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
+
+        DataVersion currentDataVersion = null;
+        int groupSeq = 0;
+        ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable 
= new ConcurrentHashMap<>();
+        ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = 
new ConcurrentHashMap<>();
+        long beginTime = System.nanoTime();
+        while (true) {
+            long leftTime = timeoutMillis - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+            if (leftTime < 0) {
+                throw new RemotingTimeoutException("invokeSync call timeout");
+            }
+
+            GetAllSubscriptionGroupRequestHeader requestHeader = new 
GetAllSubscriptionGroupRequestHeader();
+            requestHeader.setGroupSeq(groupSeq);
+            
requestHeader.setMaxGroupNum(clientConfig.getMaxPageSizeInGetMetadata());
+            
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion)
+                .map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            log.info("getAllSubscriptionGroup from seq {}, max {}, dataVersion 
{}",
+                    groupSeq, requestHeader.getMaxGroupNum(), 
requestHeader.getDataVersion());
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 requestHeader);
+            RemotingCommand response = this.remotingClient.invokeSync(
+                
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), 
request, leftTime);
+
+            assert response != null;
+            if (response.getCode() == SUCCESS) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper =
+                    SubscriptionGroupWrapper.decode(response.getBody(), 
SubscriptionGroupWrapper.class);
+                
subscriptionGroupTable.putAll(subscriptionGroupWrapper.getSubscriptionGroupTable());
+                
forbiddenTable.putAll(subscriptionGroupWrapper.getForbiddenTable());
+
+                DataVersion newDataVersion = 
subscriptionGroupWrapper.getDataVersion();
+                if (currentDataVersion == null) {
+                    // fill dataVersion before break the loop to compatible 
with old version server
+                    currentDataVersion = newDataVersion;
+                }
+
+                groupSeq += 
subscriptionGroupWrapper.getSubscriptionGroupTable().size();
+
+                GetAllSubscriptionGroupResponseHeader responseHeader =
+                    
response.decodeCommandCustomHeader(GetAllSubscriptionGroupResponseHeader.class);
+                Integer totalGroupNum = Optional.ofNullable(responseHeader)
+                    
.map(GetAllSubscriptionGroupResponseHeader::getTotalGroupNum).orElse(null);
+
+                if (Objects.isNull(totalGroupNum)) {
+                    // the server side don't support totalGroupNum, all data 
is returned
+                    break;
+                }
+
+                if (!Objects.equals(currentDataVersion, newDataVersion)) {
+                    log.error("dataVersion changed, currentDataVersion: {}, 
newDataVersion: {}",
+                        currentDataVersion, newDataVersion);
+                    currentDataVersion = newDataVersion;
+                    groupSeq = 0;
+                    subscriptionGroupTable.clear();
+                    forbiddenTable.clear();
+                    continue;
+                }
+
+                if (groupSeq >= totalGroupNum - 1) {
+                    log.info("get all subscription group config, 
totalGroupNum: {}", totalGroupNum);
+                    break;
+                }
+            } else {
+                throw new MQBrokerException(response.getCode(), 
response.getRemark(), brokerAddr);
             }
-            default:
-                break;
         }
-        throw new MQBrokerException(response.getCode(), response.getRemark(), 
brokerAddr);
+
+        SubscriptionGroupWrapper allSubscriptionGroup = new 
SubscriptionGroupWrapper();
+        allSubscriptionGroup.setSubscriptionGroupTable(subscriptionGroupTable);
+        allSubscriptionGroup.setForbiddenTable(forbiddenTable);
+        allSubscriptionGroup.setDataVersion(currentDataVersion);
+        return allSubscriptionGroup;
     }
 
     public SubscriptionGroupConfig getSubscriptionGroupConfig(final String 
brokerAddr, String group,
@@ -2842,23 +2912,77 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback, StartAndShutdo
         throw new MQBrokerException(response.getCode(), response.getRemark(), 
brokerAddr);
     }
 
-    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
-        long timeoutMillis) throws RemotingConnectException,
-        RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException, MQBrokerException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, 
long timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException, RemotingCommandException {
+
+        DataVersion currentDataVersion = null;
+        int topicSeq = 0;
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
+        long beginTime = System.nanoTime();
+        while (true) {
+            long leftTime = timeoutMillis - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime);
+            if (leftTime <= 0) {
+                throw new RemotingTimeoutException("invokeSync call timeout");
+            }
+
+            GetAllTopicConfigRequestHeader requestHeader = new 
GetAllTopicConfigRequestHeader();
+            requestHeader.setTopicSeq(topicSeq);
+            
requestHeader.setMaxTopicNum(clientConfig.getMaxPageSizeInGetMetadata());
+            
requestHeader.setDataVersion(Optional.ofNullable(currentDataVersion).
+                map(DataVersion::toJson).orElse(StringUtils.EMPTY));
+            log.info("getAllTopicConfig from seq {}, max {}, dataVersion {}",
+                    topicSeq, requestHeader.getMaxTopicNum(), 
requestHeader.getDataVersion());
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, 
requestHeader);
 
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-            request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return TopicConfigSerializeWrapper.decode(response.getBody(), 
TopicConfigSerializeWrapper.class);
+            RemotingCommand response = this.remotingClient.invokeSync(
+                
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 
request, leftTime);
+
+            assert response != null;
+            if (response.getCode() == SUCCESS) {
+                TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                    TopicConfigSerializeWrapper.decode(response.getBody(), 
TopicConfigSerializeWrapper.class);
+                
topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+                topicSeq += 
topicConfigSerializeWrapper.getTopicConfigTable().size();
+
+                DataVersion newDataVersion = 
topicConfigSerializeWrapper.getDataVersion();
+                if (currentDataVersion == null) {
+                    // fill dataVersion before break the loop to compatible 
with old version server
+                    currentDataVersion = newDataVersion;
+                }
+
+                GetAllTopicConfigResponseHeader responseHeader =
+                    
response.decodeCommandCustomHeader(GetAllTopicConfigResponseHeader.class);
+                Integer totalTopicNum = Optional.ofNullable(responseHeader)
+                    
.map(GetAllTopicConfigResponseHeader::getTotalTopicNum).orElse(null);
+
+                if (Objects.isNull(totalTopicNum)) {       // compatible with 
old version server
+                    // the server side don't support totalTopicNum, all data 
is returned
+                    break;
+                }
+
+                if (!Objects.equals(currentDataVersion, newDataVersion)) {
+                    log.error("dataVersion changed, currentDataVersion: {}, 
newDataVersion: {}", currentDataVersion, newDataVersion);
+                    currentDataVersion = newDataVersion;
+                    topicSeq = 0;
+                    topicConfigTable.clear();
+                    continue;
+                }
+
+                if (topicSeq >= totalTopicNum - 1) {
+                    log.info("get all topic config, totalTopicNum: {}", 
totalTopicNum);
+                    break;
+                }
+            } else {
+                throw new MQBrokerException(response.getCode(), 
response.getRemark(), addr);
             }
-            default:
-                break;
+
         }
 
-        throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new 
TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(currentDataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+        return topicConfigSerializeWrapper;
     }
 
     public void updateNameServerConfig(final Properties properties, final 
List<String> nameServers, long timeoutMillis)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 3d0feec8a7..77f49554a5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -425,6 +425,9 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean enableSplitRegistration = false;
 
+    private boolean enableSplitMetadata = true;
+    private int splitMetadataSize = 2000;
+
     private long popInflightMessageThreshold = 10000;
     private boolean enablePopMessageThreshold = false;
 
@@ -2060,4 +2063,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setEnableCreateSysGroup(boolean enableCreateSysGroup) {
         this.enableCreateSysGroup = enableCreateSysGroup;
     }
+
+    public boolean isEnableSplitMetadata() {
+        return enableSplitMetadata;
+    }
+
+    public void setEnableSplitMetadata(boolean enableSplitMetadata) {
+        this.enableSplitMetadata = enableSplitMetadata;
+    }
+
+    public int getSplitMetadataSize() {
+        return splitMetadataSize;
+    }
+
+    public void setSplitMetadataSize(int splitMetadataSize) {
+        this.splitMetadataSize = splitMetadataSize;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
index 7c159021aa..a1828d3d53 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/SubscriptionGroupWrapper.java
@@ -26,6 +26,8 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
 public class SubscriptionGroupWrapper extends RemotingSerializable {
     private ConcurrentMap<String, SubscriptionGroupConfig> 
subscriptionGroupTable =
         new ConcurrentHashMap<>(1024);
+    private ConcurrentMap<String, ConcurrentMap<String, Integer>> 
forbiddenTable =
+        new ConcurrentHashMap<>(1024);
     private DataVersion dataVersion = new DataVersion();
 
     public ConcurrentMap<String, SubscriptionGroupConfig> 
getSubscriptionGroupTable() {
@@ -37,6 +39,14 @@ public class SubscriptionGroupWrapper extends 
RemotingSerializable {
         this.subscriptionGroupTable = subscriptionGroupTable;
     }
 
+    public ConcurrentMap<String, ConcurrentMap<String, Integer>> 
getForbiddenTable() {
+        return forbiddenTable;
+    }
+
+    public void setForbiddenTable(ConcurrentMap<String, ConcurrentMap<String, 
Integer>> forbiddenTable) {
+        this.forbiddenTable = forbiddenTable;
+    }
+
     public DataVersion getDataVersion() {
         return dataVersion;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
similarity index 59%
copy from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
index 566ce16fa4..6d67afdb9c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupRequestHeader.java
@@ -15,22 +15,51 @@
  * limitations under the License.
  */
 
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z 
[email protected] $
- */
 package org.apache.rocketmq.remoting.protocol.header;
 
 import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
 import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource = 
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, resource 
= ResourceType.GROUP, action = Action.GET)
+public class GetAllSubscriptionGroupRequestHeader implements 
CommandCustomHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
+        // nothing
+    }
+
+    @CFNotNull
+    private Integer groupSeq;
+
+    private String dataVersion;
+
+    private Integer maxGroupNum;
+
+    public Integer getGroupSeq() {
+        return groupSeq;
+    }
+
+    public void setGroupSeq(Integer groupSeq) {
+        this.groupSeq = groupSeq;
+    }
+
+    public String getDataVersion() {
+        return dataVersion;
+    }
+
+    public void setDataVersion(String dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+
+    public Integer getMaxGroupNum() {
+        return maxGroupNum;
+    }
+
+    public void setMaxGroupNum(Integer maxGroupNum) {
+        this.maxGroupNum = maxGroupNum;
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
similarity index 72%
copy from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
index 566ce16fa4..8f42a1b2a8 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllSubscriptionGroupResponseHeader.java
@@ -15,22 +15,31 @@
  * limitations under the License.
  */
 
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z 
[email protected] $
- */
 package org.apache.rocketmq.remoting.protocol.header;
 
 import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
 import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource = 
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, resource 
= ResourceType.GROUP, action = Action.LIST)
+public class GetAllSubscriptionGroupResponseHeader implements 
CommandCustomHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
+
+    }
+
+    @CFNotNull
+    private Integer totalGroupNum;
+
+    public Integer getTotalGroupNum() {
+        return totalGroupNum;
+    }
+
+    public void setTotalGroupNum(Integer totalGroupNum) {
+        this.totalGroupNum = totalGroupNum;
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
similarity index 62%
copy from 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
copy to 
remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
index 566ce16fa4..769a814d34 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigRequestHeader.java
@@ -15,22 +15,51 @@
  * limitations under the License.
  */
 
-/**
- * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z 
[email protected] $
- */
 package org.apache.rocketmq.remoting.protocol.header;
 
 import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
 import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 
-@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource = 
ResourceType.TOPIC, action = Action.LIST)
-public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
-
+@RocketMQAction(value = RequestCode.GET_ALL_TOPIC_CONFIG, resource = 
ResourceType.TOPIC, action = Action.GET)
+public class GetAllTopicConfigRequestHeader implements CommandCustomHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
+        // nothing
+    }
+
+    @CFNotNull
+    private Integer topicSeq;
+
+    private String dataVersion;
+
+    private Integer maxTopicNum;
+
+    public Integer getTopicSeq() {
+        return topicSeq;
+    }
+
+    public void setTopicSeq(Integer topicSeq) {
+        this.topicSeq = topicSeq;
+    }
+
+    public String getDataVersion() {
+        return dataVersion;
+    }
+
+    public void setDataVersion(String dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+
+    public Integer getMaxTopicNum() {
+        return maxTopicNum;
+    }
+
+    public void setMaxTopicNum(Integer maxTopicNum) {
+        this.maxTopicNum = maxTopicNum;
     }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
index 566ce16fa4..446ed03c47 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetAllTopicConfigResponseHeader.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.action.Action;
 import org.apache.rocketmq.common.action.RocketMQAction;
 import org.apache.rocketmq.common.resource.ResourceType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 
@@ -33,4 +34,15 @@ public class GetAllTopicConfigResponseHeader implements 
CommandCustomHeader {
     @Override
     public void checkFields() throws RemotingCommandException {
     }
+
+    @CFNotNull
+    private Integer totalTopicNum;
+
+    public Integer getTotalTopicNum() {
+        return totalTopicNum;
+    }
+
+    public void setTotalTopicNum(Integer totalTopicNum) {
+        this.totalTopicNum = totalTopicNum;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 9780df13dd..2e72af13ee 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -652,23 +652,23 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
     }
 
     @Override
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException {
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
     }
 
     @Override
-    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException {
+    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, 
timeoutMillis);
     }
 
     @Override
     public TopicConfigSerializeWrapper getAllTopicConfig(final String 
brokerAddr,
         long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException {
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, 
timeoutMillis);
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1bdcc765d6..7b268cf694 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1636,14 +1636,16 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         return 
this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
     }
 
     @Override
-    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         SubscriptionGroupWrapper subscriptionGroupWrapper = 
this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, 
timeoutMillis);
 
         Iterator<Entry<String, SubscriptionGroupConfig>> iterator = 
subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet().iterator();
@@ -1658,8 +1660,9 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
     }
 
     @Override
-    public TopicConfigSerializeWrapper getAllTopicConfig(final String 
brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+    public TopicConfigSerializeWrapper getAllTopicConfig(final String 
brokerAddr, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException,
+        RemotingConnectException, MQBrokerException, RemotingCommandException {
         return 
this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, 
timeoutMillis);
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index a10a58950d..46e2c066cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -316,16 +316,18 @@ public interface MQAdminExt extends MQAdmin {
         final String topic) throws InterruptedException, MQBrokerException, 
MQClientException, RemotingException;
 
     SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException;
+        long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException, RemotingCommandException;
 
     SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
-        long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException;
+        long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException, RemotingCommandException;
 
     TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
         long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
-        RemotingConnectException, MQBrokerException;
+        RemotingConnectException, MQBrokerException, RemotingCommandException;
 
     TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, 
final boolean specialTopic,
         long timeoutMillis) throws InterruptedException, RemotingException,
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index ec5f7571d2..884764f853 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -491,7 +491,8 @@ public class DefaultMQAdminExtTest {
     }
 
     @Test
-    public void testGetAllSubscriptionGroup() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+    public void testGetAllSubscriptionGroup() throws InterruptedException, 
MQBrokerException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException {
         SubscriptionGroupWrapper subscriptionGroupWrapper = 
defaultMQAdminExt.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
         
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
         
assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");


Reply via email to