This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 775eb64b87 [ISSUE #9288] Support the disablement of producer
registration and fast channel shutdown (#9293)
775eb64b87 is described below
commit 775eb64b877f6eb20b76e02e78c3ea14119806d9
Author: ymwneu <[email protected]>
AuthorDate: Fri Apr 11 15:45:00 2025 +0800
[ISSUE #9288] Support the disablement of producer registration and fast
channel shutdown (#9293)
---
.../client/ClientChannelAttributeHelper.java | 77 ++++++++++++++++++++
.../rocketmq/broker/client/ConsumerManager.java | 39 ++++++++++
.../rocketmq/broker/client/ProducerManager.java | 82 +++++++++++++++++++---
.../broker/client/ProducerManagerTest.java | 19 ++++-
.../rocketmq/client/impl/MQClientManager.java | 4 ++
.../client/impl/factory/MQClientInstance.java | 8 +++
.../org/apache/rocketmq/common/BrokerConfig.java | 38 ++++++++++
7 files changed, 257 insertions(+), 10 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
new file mode 100644
index 0000000000..29085398d0
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelAttributeHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ClientChannelAttributeHelper {
+ private static final AttributeKey<String> ATTR_CG =
AttributeKey.valueOf("CHANNEL_CONSUMER_GROUP");
+ private static final AttributeKey<String> ATTR_PG =
AttributeKey.valueOf("CHANNEL_PRODUCER_GROUP");
+ private static final String SEPARATOR = "|";
+
+ public static void addProducerGroup(Channel channel, String group) {
+ addGroup(channel, group, ATTR_PG);
+ }
+
+ public static void addConsumerGroup(Channel channel, String group) {
+ addGroup(channel, group, ATTR_CG);
+ }
+
+ public static List<String> getProducerGroups(Channel channel) {
+ return getGroups(channel, ATTR_PG);
+ }
+
+ public static List<String> getConsumerGroups(Channel channel) {
+ return getGroups(channel, ATTR_CG);
+ }
+
+ private static void addGroup(Channel channel, String group,
AttributeKey<String> key) {
+ if (null == channel || !channel.isActive()) { // no side effect if
check active status.
+ return;
+ }
+ if (null == group || group.length() == 0 || null == key) {
+ return;
+ }
+ String groups = channel.attr(key).get();
+ if (null == groups) {
+ channel.attr(key).set(group + SEPARATOR);
+ } else {
+ if (groups.contains(SEPARATOR + group + SEPARATOR)) {
+ return;
+ } else {
+ channel.attr(key).compareAndSet(groups, groups + group +
SEPARATOR);
+ }
+ }
+ }
+
+ private static List<String> getGroups(Channel channel,
AttributeKey<String> key) {
+ if (null == channel) {
+ return Collections.emptyList();
+ }
+ if (null == key) {
+ return Collections.emptyList();
+ }
+ String groups = channel.attr(key).get();
+ return null == groups ? Collections.<String>emptyList() :
Arrays.asList(groups.split("\\|"));
+ }
+
+}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index b1057e2a8d..c658b128eb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -48,12 +48,14 @@ public class ConsumerManager {
protected final BrokerStatsManager brokerStatsManager;
private final long channelExpiredTimeout;
private final long subscriptionExpiredTimeout;
+ private final BrokerConfig brokerConfig;
public ConsumerManager(final ConsumerIdsChangeListener
consumerIdsChangeListener, long expiredTimeout) {
this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
this.brokerStatsManager = null;
this.channelExpiredTimeout = expiredTimeout;
this.subscriptionExpiredTimeout = expiredTimeout;
+ this.brokerConfig = null;
}
public ConsumerManager(final ConsumerIdsChangeListener
consumerIdsChangeListener,
@@ -62,6 +64,7 @@ public class ConsumerManager {
this.brokerStatsManager = brokerStatsManager;
this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
this.subscriptionExpiredTimeout =
brokerConfig.getSubscriptionExpiredTimeout();
+ this.brokerConfig = brokerConfig;
}
public ClientChannelInfo findChannel(final String group, final String
clientId) {
@@ -130,12 +133,43 @@ public class ConsumerManager {
public boolean doChannelCloseEvent(final String remoteAddr, final Channel
channel) {
boolean removed = false;
+ if (this.brokerConfig != null &&
this.brokerConfig.isEnableFastChannelEventProcess()) {
+ List<String> groups =
ClientChannelAttributeHelper.getConsumerGroups(channel);
+ if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5
&& groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
+ LOGGER.warn("channel close event, too many consumer groups one
channel, {}, {}, {}", groups.size(), remoteAddr, groups);
+ }
+ for (String group : groups) {
+ if (null == group || group.length() == 0) {
+ continue;
+ }
+ ConsumerGroupInfo consumerGroupInfo =
this.consumerTable.get(group);
+ if (null == consumerGroupInfo) {
+ continue;
+ }
+ ClientChannelInfo clientChannelInfo =
consumerGroupInfo.doChannelCloseEvent(remoteAddr, channel);
+ if (clientChannelInfo != null) {
+ removed = true;
+
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group,
clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
+ if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+ ConsumerGroupInfo remove =
this.consumerTable.remove(group);
+ if (remove != null) {
+ LOGGER.info("unregister consumer ok, no any
connection, and remove consumer group, {}",
+ group);
+
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
+ }
+ }
+ callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
group, consumerGroupInfo.getAllChannel());
+ }
+ }
+ return removed;
+ }
Iterator<Entry<String, ConsumerGroupInfo>> it =
this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
ClientChannelInfo clientChannelInfo =
info.doChannelCloseEvent(remoteAddr, channel);
if (clientChannelInfo != null) {
+ removed = true;
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER,
next.getKey(), clientChannelInfo, info.getSubscribeTopics());
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove =
this.consumerTable.remove(next.getKey());
@@ -201,6 +235,11 @@ public class ConsumerManager {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
group, consumerGroupInfo.getAllChannel());
}
}
+
+ if (this.brokerConfig != null &&
this.brokerConfig.isEnableFastChannelEventProcess() && r1) {
+
ClientChannelAttributeHelper.addConsumerGroup(clientChannelInfo.getChannel(),
group);
+ }
+
if (null != this.brokerStatsManager) {
this.brokerStatsManager.incConsumerRegisterTime((int)
(System.currentTimeMillis() - start));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 2c3acb6ba9..bc8400c19a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -44,15 +45,23 @@ public class ProducerManager {
new ConcurrentHashMap<>();
private final ConcurrentMap<String, Channel> clientChannelTable = new
ConcurrentHashMap<>();
protected final BrokerStatsManager brokerStatsManager;
+ private final BrokerConfig brokerConfig;
private final PositiveAtomicCounter positiveAtomicCounter = new
PositiveAtomicCounter();
private final List<ProducerChangeListener> producerChangeListenerList =
new CopyOnWriteArrayList<>();
public ProducerManager() {
this.brokerStatsManager = null;
+ this.brokerConfig = null;
}
public ProducerManager(final BrokerStatsManager brokerStatsManager) {
this.brokerStatsManager = brokerStatsManager;
+ this.brokerConfig = null;
+ }
+
+ public ProducerManager(final BrokerStatsManager brokerStatsManager, final
BrokerConfig brokerConfig) {
+ this.brokerStatsManager = brokerStatsManager;
+ this.brokerConfig = brokerConfig;
}
public int groupSize() {
@@ -136,6 +145,39 @@ public class ProducerManager {
public boolean doChannelCloseEvent(final String remoteAddr, final Channel
channel) {
boolean removed = false;
if (channel != null) {
+ if (this.brokerConfig != null &&
this.brokerConfig.isEnableFastChannelEventProcess()) {
+ List<String> groups =
ClientChannelAttributeHelper.getProducerGroups(channel);
+ if (this.brokerConfig.isPrintChannelGroups() && groups.size()
>= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
+ log.warn("channel close event, too many producer groups
one channel, {}, {}, {}", groups.size(), remoteAddr, groups);
+ }
+ for (String group : groups) {
+ if (null == group || group.length() == 0) {
+ continue;
+ }
+ ConcurrentMap<Channel, ClientChannelInfo>
clientChannelInfoTable = this.groupChannelTable.get(group);
+ if (null == clientChannelInfoTable) {
+ continue;
+ }
+ final ClientChannelInfo clientChannelInfo =
+ clientChannelInfoTable.remove(channel);
+ if (clientChannelInfo != null) {
+
clientChannelTable.remove(clientChannelInfo.getClientId());
+ removed = true;
+ log.info(
+ "NETTY EVENT: remove channel[{}][{}] from
ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr,
group);
+
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group,
clientChannelInfo);
+ if (clientChannelInfoTable.isEmpty()) {
+ ConcurrentMap<Channel, ClientChannelInfo>
oldGroupTable = this.groupChannelTable.remove(group);
+ if (oldGroupTable != null) {
+ log.info("unregister a producer group[{}] from
groupChannelTable", group);
+
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
+ }
+ }
+ }
+ }
+ return removed; // must return here, degrade to
scanNotActiveChannel at worst.
+ }
for (final Map.Entry<String, ConcurrentMap<Channel,
ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
final String group = entry.getKey();
final ConcurrentMap<Channel, ClientChannelInfo>
clientChannelInfoTable = entry.getValue();
@@ -162,20 +204,37 @@ public class ProducerManager {
}
public void registerProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
+
+ long start = System.currentTimeMillis();
ClientChannelInfo clientChannelInfoFound;
ConcurrentMap<Channel, ClientChannelInfo> channelTable =
this.groupChannelTable.get(group);
+ // note that we must take care of the exist groups and channels,
+ // only can return when groups or channels not exist.
+ if (this.brokerConfig != null
+ && !this.brokerConfig.isEnableRegisterProducer()
+ && this.brokerConfig.isRejectTransactionMessage()) {
+ boolean needRegister = true;
+ if (null == channelTable) {
+ needRegister = false;
+ } else {
+ clientChannelInfoFound =
channelTable.get(clientChannelInfo.getChannel());
+ if (null == clientChannelInfoFound) {
+ needRegister = false;
+ }
+ }
+ if (!needRegister) {
+ if (null != this.brokerStatsManager) {
+ this.brokerStatsManager.incProducerRegisterTime((int)
(System.currentTimeMillis() - start));
+ }
+ return;
+ }
+ }
+
if (null == channelTable) {
channelTable = new ConcurrentHashMap<>();
- // Make sure channelTable will NOT be cleaned by
#scanNotActiveChannel
- channelTable.put(clientChannelInfo.getChannel(),
clientChannelInfo);
ConcurrentMap<Channel, ClientChannelInfo> prev =
this.groupChannelTable.putIfAbsent(group, channelTable);
- if (null == prev) {
- // Add client-id to channel mapping for new producer group
- clientChannelTable.put(clientChannelInfo.getClientId(),
clientChannelInfo.getChannel());
- } else {
- channelTable = prev;
- }
+ channelTable = prev != null ? prev : channelTable;
}
clientChannelInfoFound =
channelTable.get(clientChannelInfo.getChannel());
@@ -184,12 +243,19 @@ public class ProducerManager {
channelTable.put(clientChannelInfo.getChannel(),
clientChannelInfo);
clientChannelTable.put(clientChannelInfo.getClientId(),
clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
+ if (this.brokerConfig != null &&
this.brokerConfig.isEnableFastChannelEventProcess()) {
+
ClientChannelAttributeHelper.addProducerGroup(clientChannelInfo.getChannel(),
group);
+ }
}
// Refresh existing client-channel-info update-timestamp
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
+
+ if (null != this.brokerStatsManager) {
+ this.brokerStatsManager.incProducerRegisterTime((int)
(System.currentTimeMillis() - start));
+ }
}
public void unregisterProducer(final String group, final ClientChannelInfo
clientChannelInfo) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 3d6091e02f..451b0e044c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -22,6 +22,8 @@ import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +38,8 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ProducerManagerTest {
+
+ private BrokerConfig brokerConfig;
private ProducerManager producerManager;
private String group = "FooBar";
private ClientChannelInfo clientInfo;
@@ -45,7 +49,8 @@ public class ProducerManagerTest {
@Before
public void init() {
- producerManager = new ProducerManager();
+ brokerConfig = new BrokerConfig();
+ producerManager = new ProducerManager(null, brokerConfig);
clientInfo = new ClientChannelInfo(channel, "clientId",
LanguageCode.JAVA, 0);
}
@@ -140,10 +145,20 @@ public class ProducerManagerTest {
}
@Test
- public void testRegisterProducer() throws Exception {
+ public void testRegisterProducer() {
+ brokerConfig.setEnableRegisterProducer(false);
+ brokerConfig.setRejectTransactionMessage(true);
producerManager.registerProducer(group, clientInfo);
Map<Channel, ClientChannelInfo> channelMap =
producerManager.getGroupChannelTable().get(group);
Channel channel1 = producerManager.findChannel("clientId");
+ assertThat(channelMap).isNull();
+ assertThat(channel1).isNull();
+
+ brokerConfig.setEnableRegisterProducer(true);
+ brokerConfig.setRejectTransactionMessage(false);
+ producerManager.registerProducer(group, clientInfo);
+ channelMap = producerManager.getGroupChannelTable().get(group);
+ channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
assertThat(channel1).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 02eaa66e99..ca6f461745 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -85,4 +85,8 @@ public class MQClientManager {
public void removeClientFactory(final String clientId) {
this.factoryTable.remove(clientId);
}
+
+ public ConcurrentMap<String, MQClientInstance> getFactoryTable() {
+ return factoryTable;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d2a4694bb0..3055f2cdee 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1395,6 +1395,14 @@ public class MQClientInstance {
return clientConfig;
}
+ public ConcurrentMap<String, MQProducerInner> getProducerTable() {
+ return producerTable;
+ }
+
+ public ConcurrentMap<String, MQConsumerInner> getConsumerTable() {
+ return consumerTable;
+ }
+
public TopicRouteData queryTopicRouteData(String topic) {
TopicRouteData data = this.getAnExistTopicRouteData(topic);
if (data == null) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 44f5e1eff0..a411ad496b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -428,6 +428,10 @@ public class BrokerConfig extends BrokerIdentity {
private long popInflightMessageThreshold = 10000;
private boolean enablePopMessageThreshold = false;
+ private boolean enableFastChannelEventProcess = false;
+ private boolean printChannelGroups = false;
+ private int printChannelGroupsMinNum = 5;
+
private int splitRegistrationSize = 800;
/**
@@ -457,6 +461,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean recallMessageEnable = false;
+ private boolean enableRegisterProducer = true;
+
private boolean enableCreateSysGroup = true;
public String getConfigBlackList() {
@@ -1915,6 +1921,30 @@ public class BrokerConfig extends BrokerIdentity {
this.enableSplitRegistration = enableSplitRegistration;
}
+ public boolean isEnableFastChannelEventProcess() {
+ return enableFastChannelEventProcess;
+ }
+
+ public void setEnableFastChannelEventProcess(boolean
enableFastChannelEventProcess) {
+ this.enableFastChannelEventProcess = enableFastChannelEventProcess;
+ }
+
+ public boolean isPrintChannelGroups() {
+ return printChannelGroups;
+ }
+
+ public void setPrintChannelGroups(boolean printChannelGroups) {
+ this.printChannelGroups = printChannelGroups;
+ }
+
+ public int getPrintChannelGroupsMinNum() {
+ return printChannelGroupsMinNum;
+ }
+
+ public void setPrintChannelGroupsMinNum(int printChannelGroupsMinNum) {
+ this.printChannelGroupsMinNum = printChannelGroupsMinNum;
+ }
+
public int getSplitRegistrationSize() {
return splitRegistrationSize;
}
@@ -2019,6 +2049,14 @@ public class BrokerConfig extends BrokerIdentity {
this.recallMessageEnable = recallMessageEnable;
}
+ public boolean isEnableRegisterProducer() {
+ return enableRegisterProducer;
+ }
+
+ public void setEnableRegisterProducer(boolean enableRegisterProducer) {
+ this.enableRegisterProducer = enableRegisterProducer;
+ }
+
public boolean isEnableCreateSysGroup() {
return enableCreateSysGroup;
}