This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 92c922378a [ISSUE #8358] Client does not send heartbeats to all
Nameserve in clustered mode, resulting in frequent disconnections (#8359)
92c922378a is described below
commit 92c922378aa7c92e4239f0b46be8ea97ed257c2e
Author: weihubeats <[email protected]>
AuthorDate: Thu Jul 4 08:53:07 2024 +0800
[ISSUE #8358] Client does not send heartbeats to all Nameserve in clustered
mode, resulting in frequent disconnections (#8359)
* Adding null does not update
* rolling back
* remove client scanAvailableNameSrv
---
.../client/impl/factory/MQClientInstance.java | 1 +
.../rocketmq/remoting/netty/NettyClientConfig.java | 10 ++++++++
.../remoting/netty/NettyRemotingClient.java | 30 ++++++++++++----------
3 files changed, 28 insertions(+), 13 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index b4ebf69273..c9fd3c83e0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -152,6 +152,7 @@ public class MQClientInstance {
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.nettyClientConfig.setSocksProxyConfig(clientConfig.getSocksProxyConfig());
+ this.nettyClientConfig.setScanAvailableNameSrv(false);
ClientRemotingProcessor clientRemotingProcessor = new
ClientRemotingProcessor(this);
ChannelEventListener channelEventListener;
if (clientConfig.isEnableHeartbeatChannelEventListener()) {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index c28288786a..7b7263e27a 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -31,6 +31,8 @@ public class NettyClientConfig {
private int connectTimeoutMillis = NettySystemConfig.connectTimeoutMillis;
private long channelNotActiveInterval = 1000 * 60;
+ private boolean isScanAvailableNameSrv = true;
+
/**
* IdleStateEvent will be triggered when neither read nor write was
performed for
* the specified period of this time. Specify {@code 0} to disable
@@ -218,4 +220,12 @@ public class NettyClientConfig {
public void setSocksProxyConfig(String socksProxyConfig) {
this.socksProxyConfig = socksProxyConfig;
}
+
+ public boolean isScanAvailableNameSrv() {
+ return isScanAvailableNameSrv;
+ }
+
+ public void setScanAvailableNameSrv(boolean scanAvailableNameSrv) {
+ this.isScanAvailableNameSrv = scanAvailableNameSrv;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 1bc5e57db5..1d595f32b9 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -251,20 +251,24 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
};
this.timer.newTimeout(timerTaskScanResponseTable, 1000 * 3,
TimeUnit.MILLISECONDS);
- int connectTimeoutMillis =
this.nettyClientConfig.getConnectTimeoutMillis();
- TimerTask timerTaskScanAvailableNameSrv = new TimerTask() {
- @Override
- public void run(Timeout timeout) {
- try {
- NettyRemotingClient.this.scanAvailableNameSrv();
- } catch (Exception e) {
- LOGGER.error("scanAvailableNameSrv exception", e);
- } finally {
- timer.newTimeout(this, connectTimeoutMillis,
TimeUnit.MILLISECONDS);
+ if (nettyClientConfig.isScanAvailableNameSrv()) {
+ int connectTimeoutMillis =
this.nettyClientConfig.getConnectTimeoutMillis();
+ TimerTask timerTaskScanAvailableNameSrv = new TimerTask() {
+ @Override
+ public void run(Timeout timeout) {
+ try {
+ NettyRemotingClient.this.scanAvailableNameSrv();
+ } catch (Exception e) {
+ LOGGER.error("scanAvailableNameSrv exception", e);
+ } finally {
+ timer.newTimeout(this, connectTimeoutMillis,
TimeUnit.MILLISECONDS);
+ }
}
- }
- };
- this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0,
TimeUnit.MILLISECONDS);
+ };
+ this.timer.newTimeout(timerTaskScanAvailableNameSrv, 0,
TimeUnit.MILLISECONDS);
+ }
+
+
}
private Map.Entry<String, SocksProxyConfig> getProxy(String addr) {