This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new d58e13d Proxy Support And ConsumerGroup Enhancement (#207)
d58e13d is described below
commit d58e13da95b992cb3d6b05de1b1fe0ee80cb1e87
Author: Akai <[email protected]>
AuthorDate: Wed Jun 12 09:12:19 2024 +0800
Proxy Support And ConsumerGroup Enhancement (#207)
* Support dashboard v4-v5 switch And query for v5 topic
* Modify tag name
* Support proxy-module And Fix the problem of showing wrong
consumerGroup-info
---------
Co-authored-by: yuanziwei <[email protected]>
---
.../rocketmq/dashboard/config/RMQConfigure.java | 23 +++++
.../dashboard/controller/ConsumerController.java | 16 ++--
.../dashboard/controller/ProxyController.java | 54 ++++++++++++
.../rocketmq/dashboard/model/GroupConsumeInfo.java | 27 ++++--
.../dashboard/service/ConsumerService.java | 8 +-
.../rocketmq/dashboard/service/ProxyService.java | 28 +++++++
.../dashboard/service/client/MQAdminExtImpl.java | 5 +-
.../dashboard/service/client/ProxyAdmin.java | 28 +++++++
.../dashboard/service/client/ProxyAdminImpl.java | 60 +++++++++++++
.../service/impl/ConsumerServiceImpl.java | 67 +++++++++++----
.../dashboard/service/impl/ProxyServiceImpl.java | 59 +++++++++++++
.../rocketmq/dashboard/task/MonitorTask.java | 2 +-
src/main/resources/application.yml | 3 +
src/main/resources/static/index.html | 1 +
src/main/resources/static/src/app.js | 3 +
src/main/resources/static/src/consumer.js | 9 +-
src/main/resources/static/src/i18n/en.js | 1 +
src/main/resources/static/src/i18n/zh.js | 1 +
src/main/resources/static/src/proxy.js | 97 ++++++++++++++++++++++
src/main/resources/static/view/layout/_header.html | 1 +
src/main/resources/static/view/pages/consumer.html | 4 +-
src/main/resources/static/view/pages/proxy.html | 67 +++++++++++++++
22 files changed, 516 insertions(+), 48 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
index 991a2d8..5ce21ff 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/config/RMQConfigure.java
@@ -43,6 +43,8 @@ public class RMQConfigure {
//use rocketmq.namesrv.addr first,if it is empty,than use system proerty
or system env
private volatile String namesrvAddr =
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private volatile String proxyAddr;
+
private volatile String isVIPChannel =
System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true");
@@ -62,6 +64,8 @@ public class RMQConfigure {
private List<String> namesrvAddrs = new ArrayList<>();
+ private List<String> proxyAddrs = new ArrayList<>();
+
public String getAccessKey() {
return accessKey;
}
@@ -86,6 +90,25 @@ public class RMQConfigure {
return namesrvAddrs;
}
+ public List<String> getProxyAddrs() {
+ return this.proxyAddrs;
+ }
+
+ public void setProxyAddrs(List<String> proxyAddrs) {
+ this.proxyAddrs = proxyAddrs;
+ if (CollectionUtils.isNotEmpty(proxyAddrs)) {
+ this.setProxyAddr(proxyAddrs.get(0));
+ }
+ }
+
+ public String getProxyAddr() {
+ return proxyAddr;
+ }
+
+ public void setProxyAddr(String proxyAddr) {
+ this.proxyAddr = proxyAddr;
+ }
+
public void setNamesrvAddrs(List<String> namesrvAddrs) {
this.namesrvAddrs = namesrvAddrs;
if (CollectionUtils.isNotEmpty(namesrvAddrs)) {
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
index d9f22e4..96fc056 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/controller/ConsumerController.java
@@ -47,14 +47,14 @@ public class ConsumerController {
@RequestMapping(value = "/groupList.query")
@ResponseBody
- public Object list(@RequestParam(value = "skipSysGroup", required = false)
boolean skipSysGroup) {
- return consumerService.queryGroupList(skipSysGroup);
+ public Object list(@RequestParam(value = "skipSysGroup", required = false)
boolean skipSysGroup, String address) {
+ return consumerService.queryGroupList(skipSysGroup, address);
}
@RequestMapping(value = "/group.query")
@ResponseBody
- public Object groupQuery(@RequestParam String consumerGroup) {
- return consumerService.queryGroup(consumerGroup);
+ public Object groupQuery(@RequestParam String consumerGroup, String
address) {
+ return consumerService.queryGroup(consumerGroup, address);
}
@RequestMapping(value = "/resetOffset.do", method = {RequestMethod.POST})
@@ -99,14 +99,14 @@ public class ConsumerController {
@RequestMapping(value = "/queryTopicByConsumer.query")
@ResponseBody
- public Object queryConsumerByTopic(@RequestParam String consumerGroup) {
- return consumerService.queryConsumeStatsListByGroupName(consumerGroup);
+ public Object queryConsumerByTopic(@RequestParam String consumerGroup,
String address) {
+ return consumerService.queryConsumeStatsListByGroupName(consumerGroup,
address);
}
@RequestMapping(value = "/consumerConnection.query")
@ResponseBody
- public Object consumerConnection(@RequestParam(required = false) String
consumerGroup) {
- ConsumerConnection consumerConnection =
consumerService.getConsumerConnection(consumerGroup);
+ public Object consumerConnection(@RequestParam(required = false) String
consumerGroup, String address) {
+ ConsumerConnection consumerConnection =
consumerService.getConsumerConnection(consumerGroup, address);
consumerConnection.setConnectionSet(ConnectionInfo.buildConnectionInfoHashSet(consumerConnection.getConnectionSet()));
return consumerConnection;
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
new file mode 100644
index 0000000..27aa59d
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/dashboard/controller/ProxyController.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.controller;
+
+import org.apache.rocketmq.dashboard.permisssion.Permission;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import javax.annotation.Resource;
+
+@Controller
+@RequestMapping("/proxy")
+@Permission
+public class ProxyController {
+ @Resource
+ private ProxyService proxyService;
+ @RequestMapping(value = "/homePage.query", method = RequestMethod.GET)
+ @ResponseBody
+ public Object homePage() {
+ return proxyService.getProxyHomePage();
+ }
+
+ @RequestMapping(value = "/addProxyAddr.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object addProxyAddr(@RequestParam String newProxyAddr) {
+ proxyService.addProxyAddrList(newProxyAddr);
+ return true;
+ }
+
+ @RequestMapping(value = "/updateProxyAddr.do", method = RequestMethod.POST)
+ @ResponseBody
+ public Object updateProxyAddr(@RequestParam String proxyAddr) {
+ proxyService.updateProxyAddrList(proxyAddr);
+ return true;
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
index 0d19af9..db11c41 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/model/GroupConsumeInfo.java
@@ -19,12 +19,15 @@ package org.apache.rocketmq.dashboard.model;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import java.util.List;
+
public class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group;
private String version;
private int count;
private ConsumeType consumeType;
private MessageModel messageModel;
+ private List<String> address;
private int consumeTps;
private long diffTotal = -1;
private String subGroupType = "NORMAL";
@@ -70,6 +73,22 @@ public class GroupConsumeInfo implements
Comparable<GroupConsumeInfo> {
this.diffTotal = diffTotal;
}
+ public List<String> getAddress() {
+ return address;
+ }
+
+ public void setAddress(List<String> address) {
+ this.address = address;
+ }
+
+ public String getSubGroupType() {
+ return subGroupType;
+ }
+
+ public void setSubGroupType(String subGroupType) {
+ this.subGroupType = subGroupType;
+ }
+
@Override
public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) {
@@ -93,12 +112,4 @@ public class GroupConsumeInfo implements
Comparable<GroupConsumeInfo> {
public void setVersion(String version) {
this.version = version;
}
-
- public String getSubGroupType() {
- return subGroupType;
- }
-
- public void setSubGroupType(String subGroupType) {
- this.subGroupType = subGroupType;
- }
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
index c475931..e284c44 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ConsumerService.java
@@ -31,12 +31,12 @@ import java.util.Map;
import java.util.Set;
public interface ConsumerService {
- List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup);
+ List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup,String address);
- GroupConsumeInfo queryGroup(String consumerGroup);
+ GroupConsumeInfo queryGroup(String consumerGroup, String address);
- List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName);
+ List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String groupName,
String address);
List<TopicConsumerInfo> queryConsumeStatsList(String topic, String
groupName);
@@ -52,7 +52,7 @@ public interface ConsumerService {
Set<String> fetchBrokerNameSetBySubscriptionGroup(String group);
- ConsumerConnection getConsumerConnection(String consumerGroup);
+ ConsumerConnection getConsumerConnection(String consumerGroup, String
address);
ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String
clientId, boolean jstack);
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
new file mode 100644
index 0000000..2a64680
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/ProxyService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service;
+
+import java.util.Map;
+
+public interface ProxyService {
+
+ void addProxyAddrList(String proxyAddr);
+
+ void updateProxyAddrList(String proxyAddr);
+
+ Map<String, Object> getProxyHomePage();
+}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 360c0e3..0281c5c 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -627,7 +627,7 @@ public class MQAdminExtImpl implements MQAdminExt {
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
// TODO Auto-generated method stub
- throw new UnsupportedOperationException("Unimplemented method
'examineConsumeStats'");
+ return
MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(brokerAddr,
consumerGroup, topicName, timeoutMillis);
}
@Override
@@ -639,8 +639,7 @@ public class MQAdminExtImpl implements MQAdminExt {
@Override
public ConsumerConnection examineConsumerConnectionInfo(String
consumerGroup, String brokerAddr)
throws InterruptedException, MQBrokerException, RemotingException,
MQClientException {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Unimplemented method
'examineConsumerConnectionInfo'");
+ return
MQAdminInstance.threadLocalMQAdminExt().examineConsumerConnectionInfo(consumerGroup,
brokerAddr);
}
@Override
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
new file mode 100644
index 0000000..4344c7c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdmin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+
+public interface ProxyAdmin {
+
+ ConsumerConnection examineConsumerConnectionInfo(String addr, String
consumerGroup) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException;
+}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
new file mode 100644
index 0000000..eadae12
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/client/ProxyAdminImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.client;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import static
org.apache.rocketmq.remoting.protocol.RequestCode.GET_CONSUMER_CONNECTION_LIST;
+
+@Slf4j
+@Service
+public class ProxyAdminImpl implements ProxyAdmin {
+ @Autowired
+ private GenericObjectPool<MQAdminExt> mqAdminExtPool;
+
+ @Override
+ public ConsumerConnection examineConsumerConnectionInfo(String addr,
String consumerGroup) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
+ try {
+ MQAdminInstance.createMQAdmin(mqAdminExtPool);
+ RemotingClient remotingClient =
MQAdminInstance.threadLocalRemotingClient();
+ GetConsumerConnectionListRequestHeader requestHeader = new
GetConsumerConnectionListRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(GET_CONSUMER_CONNECTION_LIST,
requestHeader);
+ RemotingCommand response = remotingClient.invokeSync(addr,
request, 3000);
+ switch (response.getCode()) {
+ case 0:
+ return ConsumerConnection.decode(response.getBody(),
ConsumerConnection.class);
+ default:
+ throw new MQBrokerException(response.getCode(),
response.getRemark(), addr);
+ }
+ } finally {
+ MQAdminInstance.returnMQAdmin(mqAdminExtPool);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
index a1cf9ff..9bc37ab 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java
@@ -23,8 +23,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -77,6 +80,8 @@ import org.springframework.stereotype.Service;
public class ConsumerServiceImpl extends AbstractCommonService implements
ConsumerService, InitializingBean, DisposableBean {
private Logger logger = LoggerFactory.getLogger(ConsumerServiceImpl.class);
+ @Resource
+ protected ProxyAdmin proxyAdmin;
@Resource
private RMQConfigure configure;
@@ -119,25 +124,33 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
}
@Override
- public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup) {
- Set<String> consumerGroupSet = Sets.newHashSet();
+ public List<GroupConsumeInfo> queryGroupList(boolean skipSysGroup, String
address) {
+ HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
for (BrokerData brokerData :
clusterInfo.getBrokerAddrTable().values()) {
SubscriptionGroupWrapper subscriptionGroupWrapper =
mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
-
consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ for (String groupName :
subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
+ if (!consumerGroupMap.containsKey(groupName)) {
+ consumerGroupMap.putIfAbsent(groupName, new
ArrayList<>());
+ }
+ List<String> addresses = consumerGroupMap.get(groupName);
+ addresses.add(brokerData.selectBrokerAddr());
+ consumerGroupMap.put(groupName, addresses);
+ }
}
- }
- catch (Exception err) {
+ } catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
List<GroupConsumeInfo> groupConsumeInfoList =
Collections.synchronizedList(Lists.newArrayList());
- CountDownLatch countDownLatch = new
CountDownLatch(consumerGroupSet.size());
- for (String consumerGroup : consumerGroupSet) {
+ CountDownLatch countDownLatch = new
CountDownLatch(consumerGroupMap.size());
+ for (Map.Entry<String, List<String>> entry :
consumerGroupMap.entrySet()) {
+ String consumerGroup = entry.getKey();
executorService.submit(() -> {
try {
- GroupConsumeInfo consumeInfo = queryGroup(consumerGroup);
+ GroupConsumeInfo consumeInfo = queryGroup(consumerGroup,
address);
+ consumeInfo.setAddress(entry.getValue());
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}",
consumerGroup, e);
@@ -165,7 +178,7 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
}
@Override
- public GroupConsumeInfo queryGroup(String consumerGroup) {
+ public GroupConsumeInfo queryGroup(String consumerGroup, String address) {
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
try {
ConsumeStats consumeStats = null;
@@ -182,9 +195,12 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
.allMatch(SubscriptionGroupConfig::isConsumeMessageOrderly);
try {
- consumerConnection =
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
- }
- catch (Exception e) {
+ if (StringUtils.isNotEmpty(address)) {
+ consumerConnection =
proxyAdmin.examineConsumerConnectionInfo(address, consumerGroup);
+ } else {
+ consumerConnection =
mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
+ }
+ } catch (Exception e) {
logger.warn("examineConsumeStats exception to consumerGroup
{}, response [{}]", consumerGroup, e.getMessage());
}
@@ -217,8 +233,18 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
}
@Override
- public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String
groupName) {
- return queryConsumeStatsList(null, groupName);
+ public List<TopicConsumerInfo> queryConsumeStatsListByGroupName(String
groupName, String address) {
+ ConsumeStats consumeStats;
+ String topic = null;
+ try {
+ String[] addresses = address.split(",");
+ String addr = addresses[0];
+ consumeStats = mqAdminExt.examineConsumeStats(addr, groupName,
null, 3000);
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ return toTopicConsumerInfoList(topic, consumeStats, groupName);
}
@Override
@@ -231,6 +257,10 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
+ return toTopicConsumerInfoList(topic, consumeStats, groupName);
+ }
+
+ private List<TopicConsumerInfo> toTopicConsumerInfoList(String topic,
ConsumeStats consumeStats, String groupName) {
List<MessageQueue> mqList =
Lists.newArrayList(Iterables.filter(consumeStats.getOffsetTable().keySet(), new
Predicate<MessageQueue>() {
@Override
public boolean apply(MessageQueue o) {
@@ -431,11 +461,12 @@ public class ConsumerServiceImpl extends
AbstractCommonService implements Consum
}
@Override
- public ConsumerConnection getConsumerConnection(String consumerGroup) {
+ public ConsumerConnection getConsumerConnection(String consumerGroup,
String address) {
try {
- return mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
- }
- catch (Exception e) {
+ String[] addresses = address.split(",");
+ String addr = addresses[0];
+ return mqAdminExt.examineConsumerConnectionInfo(consumerGroup,
addr);
+ } catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
new file mode 100644
index 0000000..07e63b3
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/ProxyServiceImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.dashboard.service.impl;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.dashboard.config.RMQConfigure;
+import org.apache.rocketmq.dashboard.service.ProxyService;
+import org.apache.rocketmq.dashboard.service.client.ProxyAdmin;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+@Service
+public class ProxyServiceImpl implements ProxyService {
+ @Resource
+ protected ProxyAdmin proxyAdmin;
+ @Resource
+ private RMQConfigure configure;
+
+ @Override
+ public void addProxyAddrList(String proxyAddr) {
+ List<String> proxyAddrs = configure.getProxyAddrs();
+ if (proxyAddrs != null && !proxyAddrs.contains(proxyAddr)) {
+ proxyAddrs.add(proxyAddr);
+ }
+ configure.setProxyAddrs(proxyAddrs);
+ }
+
+ @Override
+ public void updateProxyAddrList(String proxyAddr) {
+ configure.setProxyAddr(proxyAddr);
+ }
+
+ @Override
+ public Map<String, Object> getProxyHomePage() {
+ Map<String, Object> homePageInfoMap = Maps.newHashMap();
+ homePageInfoMap.put("currentProxyAddr", configure.getProxyAddr());
+ homePageInfoMap.put("proxyAddrList", configure.getProxyAddrs());
+ return homePageInfoMap;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
index 710929b..3c8a77e 100644
--- a/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
+++ b/src/main/java/org/apache/rocketmq/dashboard/task/MonitorTask.java
@@ -40,7 +40,7 @@ public class MonitorTask {
// @Scheduled(cron = "* * * * * ?")
public void scanProblemConsumeGroup() {
for (Map.Entry<String, ConsumerMonitorConfig> configEntry :
monitorService.queryConsumerMonitorConfig().entrySet()) {
- GroupConsumeInfo consumeInfo =
consumerService.queryGroup(configEntry.getKey());
+ GroupConsumeInfo consumeInfo =
consumerService.queryGroup(configEntry.getKey(), null);
if (consumeInfo.getCount() < configEntry.getValue().getMinCount()
|| consumeInfo.getDiffTotal() > configEntry.getValue().getMaxDiffTotal()) {
logger.info("op=look consumeInfo {}",
JsonUtil.obj2String(consumeInfo)); // notify the alert system
}
diff --git a/src/main/resources/application.yml
b/src/main/resources/application.yml
index 090e421..fe4d283 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -59,6 +59,9 @@ rocketmq:
# must create userInfo file: ${rocketmq.config.dataPath}/users.properties
if the login is required
loginRequired: false
useTLS: false
+ proxyAddr: 127.0.0.1:8080
+ proxyAddrs:
+ - 127.0.0.1:8080
# set the accessKey and secretKey if you used acl
# accessKey: rocketmq2
# secretKey: 12345678
diff --git a/src/main/resources/static/index.html
b/src/main/resources/static/index.html
index c2bf349..ee3c3fe 100644
--- a/src/main/resources/static/index.html
+++ b/src/main/resources/static/index.html
@@ -104,6 +104,7 @@
<script type="text/javascript"
src="src/tools/tools.js?v=201703171710"></script>
<script type="text/javascript" src="src/cluster.js?timestamp=4"></script>
<script type="text/javascript" src="src/topic.js"></script>
+<script type="text/javascript" src="src/proxy.js"></script>
<script type="text/javascript" src="src/consumer.js?timestamp=6"></script>
<script type="text/javascript" src="src/producer.js"></script>
<script type="text/javascript" src="src/message.js"></script>
diff --git a/src/main/resources/static/src/app.js
b/src/main/resources/static/src/app.js
index a7ca1be..1bbb650 100644
--- a/src/main/resources/static/src/app.js
+++ b/src/main/resources/static/src/app.js
@@ -213,6 +213,9 @@ app.config(['$routeProvider',
'$httpProvider','$cookiesProvider','getDictNamePro
}).when('/ops', {
templateUrl: 'view/pages/ops.html',
controller:'opsController'
+ }).when('/proxy', {
+ templateUrl: 'view/pages/proxy.html',
+ controller:'proxyController'
}).when('/acl', {
templateUrl: 'view/pages/acl.html',
controller: 'aclController'
diff --git a/src/main/resources/static/src/consumer.js
b/src/main/resources/static/src/consumer.js
index 8c0833e..d192334 100644
--- a/src/main/resources/static/src/consumer.js
+++ b/src/main/resources/static/src/consumer.js
@@ -79,6 +79,7 @@ module.controller('consumerController', ['$scope',
'ngDialog', '$http', 'Notific
url: "consumer/groupList.query",
params: {
skipSysGroup: false,
+ address: localStorage.getItem('isV5') ?
localStorage.getItem('proxyAddr') : null
}
}).success(function (resp) {
if (resp.status == 0) {
@@ -243,11 +244,11 @@ module.controller('consumerController', ['$scope',
'ngDialog', '$http', 'Notific
}
});
};
- $scope.detail = function (consumerGroupName) {
+ $scope.detail = function (consumerGroupName, address) {
$http({
method: "GET",
url: "consumer/queryTopicByConsumer.query",
- params: {consumerGroup: consumerGroupName}
+ params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
@@ -262,11 +263,11 @@ module.controller('consumerController', ['$scope',
'ngDialog', '$http', 'Notific
});
};
- $scope.client = function (consumerGroupName) {
+ $scope.client = function (consumerGroupName, address) {
$http({
method: "GET",
url: "consumer/consumerConnection.query",
- params: {consumerGroup: consumerGroupName}
+ params: {consumerGroup: consumerGroupName, address: address}
}).success(function (resp) {
if (resp.status == 0) {
console.log(resp);
diff --git a/src/main/resources/static/src/i18n/en.js
b/src/main/resources/static/src/i18n/en.js
index 6bc16cd..83083d7 100644
--- a/src/main/resources/static/src/i18n/en.js
+++ b/src/main/resources/static/src/i18n/en.js
@@ -100,6 +100,7 @@ var en = {
"RESET_OFFSET":"resetOffset",
"CLUSTER_NAME":"clusterName",
"OPS":"OPS",
+ "PROXY":"Proxy",
"AUTO_REFRESH":"AUTO_REFRESH",
"REFRESH":"REFRESH",
"LOGOUT":"Logout",
diff --git a/src/main/resources/static/src/i18n/zh.js
b/src/main/resources/static/src/i18n/zh.js
index f71ae34..f8c3c1d 100644
--- a/src/main/resources/static/src/i18n/zh.js
+++ b/src/main/resources/static/src/i18n/zh.js
@@ -101,6 +101,7 @@ var zh = {
"RESET_OFFSET":"重置位点",
"CLUSTER_NAME":"集群名",
"OPS":"运维",
+ "PROXY":"代理",
"AUTO_REFRESH":"自动刷新",
"REFRESH":"刷新",
"LOGOUT":"退出",
diff --git a/src/main/resources/static/src/proxy.js
b/src/main/resources/static/src/proxy.js
new file mode 100644
index 0000000..4461b09
--- /dev/null
+++ b/src/main/resources/static/src/proxy.js
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+var module = app;
+module.controller('proxyController', ['$scope', '$location', '$http',
'Notification', 'remoteApi', 'tools', '$window',
+ function ($scope, $location, $http, Notification, remoteApi, tools,
$window) {
+ $scope.proxyAddrList = [];
+ $scope.userRole = $window.sessionStorage.getItem("userrole");
+ $scope.writeOperationEnabled = $scope.userRole == null ? true :
($scope.userRole == 1 ? true : false);
+ $scope.inputReadonly = !$scope.writeOperationEnabled;
+ $scope.newProxyAddr = "";
+ $scope.allProxyConfig = {};
+
+ $http({
+ method: "GET",
+ url: "proxy/homePage.query"
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ $scope.proxyAddrList = resp.data.proxyAddrList;
+ $scope.selectedProxy = resp.data.currentProxyAddr;
+ $scope.showProxyDetailConfig($scope.selectedProxy);
+ localStorage.setItem('proxyAddr',$scope.selectedProxy);
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+
+ $scope.eleChange = function (data) {
+ $scope.proxyAddrList = data;
+ }
+ $scope.showDetailConf = function () {
+ $(".proxyModal").modal();
+ }
+
+
+ $scope.showProxyDetailConfig = function (proxyAddr) {
+ $http({
+ method: "GET",
+ url: "proxy/proxyDetailConfig.query",
+ params: {proxyAddress: proxyAddr}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ $scope.allProxyConfig = resp.data;
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ };
+
+ $scope.updateProxyAddr = function () {
+ $http({
+ method: "POST",
+ url: "proxy/updateProxyAddr.do",
+ params: {proxyAddr: $scope.selectedProxy}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ localStorage.setItem('proxyAddr', $scope.selectedProxy);
+ Notification.info({message: "SUCCESS", delay: 2000});
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ $scope.showProxyDetailConfig($scope.selectedProxy);
+ };
+
+ $scope.addProxyAddr = function () {
+ $http({
+ method: "POST",
+ url: "proxy/addProxyAddr.do",
+ params: {newProxyAddr: $scope.newProxyAddr}
+ }).success(function (resp) {
+ if (resp.status == 0) {
+ if ($scope.proxyAddrList.indexOf($scope.newProxyAddr) ==
-1) {
+ $scope.proxyAddrList.push($scope.newProxyAddr);
+ }
+ $("#proxyAddr").val("");
+ $scope.newProxyAddr = "";
+ Notification.info({message: "SUCCESS", delay: 2000});
+ } else {
+ Notification.error({message: resp.errMsg, delay: 2000});
+ }
+ });
+ };
+ }])
diff --git a/src/main/resources/static/view/layout/_header.html
b/src/main/resources/static/view/layout/_header.html
index a78b9f2..8159138 100644
--- a/src/main/resources/static/view/layout/_header.html
+++ b/src/main/resources/static/view/layout/_header.html
@@ -28,6 +28,7 @@
<div class="navbar-collapse collapse navbar-warning-collapse">
<ul class="nav navbar-nav">
<li ng-class="path =='ops' ? 'active':''"><a
ng-href="#/ops">{{'OPS' | translate}}</a></li>
+ <li ng-show="rmqVersion" ng-class="path =='proxy' ?
'active':''"><a ng-href="#/proxy">{{'PROXY' | translate}}</a></li>
<li ng-class="path =='dashboard' || path ==''? 'active':''"><a
ng-href="#/">{{'DASHBOARD' | translate}}</a></li>
<li ng-class="path =='cluster' ? 'active':''"><a
ng-href="#/cluster">{{'CLUSTER' | translate}}</a></li>
<li ng-class="path =='topic' ? 'active':''"><a
ng-href="#/topic">{{'TOPIC' | translate}}</a></li>
diff --git a/src/main/resources/static/view/pages/consumer.html
b/src/main/resources/static/view/pages/consumer.html
index 47fddad..d883afc 100644
--- a/src/main/resources/static/view/pages/consumer.html
+++ b/src/main/resources/static/view/pages/consumer.html
@@ -66,11 +66,11 @@
<td
class="text-center">{{consumerGroup.consumeTps}}</td>
<td
class="text-center">{{consumerGroup.diffTotal}}</td>
<td class="text-left">
- <button name="client"
ng-click="client(consumerGroup.group)"
+ <button name="client"
ng-click="client(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CLIENT' | translate}}
</button>
- <button name="client"
ng-click="detail(consumerGroup.group)"
+ <button name="client"
ng-click="detail(consumerGroup.group, consumerGroup.address)"
class="btn btn-raised btn-sm btn-primary"
type="button">{{'CONSUME_DETAIL' |
translate}}
</button>
diff --git a/src/main/resources/static/view/pages/proxy.html
b/src/main/resources/static/view/pages/proxy.html
new file mode 100644
index 0000000..43f34ce
--- /dev/null
+++ b/src/main/resources/static/view/pages/proxy.html
@@ -0,0 +1,67 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<div class="container-fluid" id="deployHistoryList">
+ <div class="page-content">
+ <h2 class="md-title">ProxyServerAddressList</h2>
+ <div class="pull-left" style="min-width: 400px; max-width: 500px;
padding: 10px 10px 10px 0">
+ <select ng-model="selectedProxy" chosen
+ ng-options="x for x in proxyAddrList"
+ ng-change="updateProxyAddr()"
+ required></select>
+ </div>
+ <div class="pull-left">
+ <button class="btn btn-raised btn-sm btn-primary" type="button"
ng-show="{{writeOperationEnabled}}"
+ ng-click="updateProxyAddr()">{{'UPDATE' | translate}}
+ </button>
+ </div>
+ <form class="form-inline pull-left" style="margin-left: 20px"
ng-show="{{writeOperationEnabled}}">
+ <div class="form-group" style="margin: 0">
+ <label for="proxyAddr">ProxyAddr:</label>
+ <input id="proxyAddr" class="form-control" style="width:
300px; margin: 0 10px 0 10px" type="text" ng-model="newProxyAddr" required/>
+ <button class="btn btn-raised btn-sm btn-primary" type="button"
+ ng-click="addProxyAddr()"> {{ 'ADD' | translate}}
+ </button>
+ </div>
+ </form>
+ </div>
+</div>
+
+<div class="modal proxyModal fade" role="dialog" tabindex="-1"
aria-hidden="true" aria-labelledby="config-modal-label">
+ <div class="modal-dialog modal-lg">
+ <div class="modal-content" >
+ <div class="modal-header">
+ <button class="close" type="button"
data-dismiss="modal">×</button>
+ <h4 id="config-modal-label" class="modal-title">
+ [{{selectedProxy}}]
+ </h4>
+ </div>
+ <div class="modal-body limit_height">
+ <table class="table table-bordered">
+ <tr ng-repeat="(key, value) in allProxyConfig">
+ <td>{{key}}</td>
+ <td>{{value}}</td>
+ </tr>
+ </table>
+ </div>
+ <div class="modal-footer">
+ <div class="col-md-12 text-center">
+ <button type="button" class="btn btn-raised"
data-dismiss="modal">{{ 'CLOSE' | translate }}</button>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>