This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a7d43d18 [ISSUE #9470] [Java] Support ReconnectEndpointsCommand (#1018)
a7d43d18 is described below
commit a7d43d18815e9cb666f659adfb045326bd8fc347
Author: ymwneu <[email protected]>
AuthorDate: Thu Jul 3 17:57:25 2025 +0800
[ISSUE #9470] [Java] Support ReconnectEndpointsCommand (#1018)
---
.../org/apache/rocketmq/client/java/impl/ClientImpl.java | 15 +++++++++++++++
.../rocketmq/client/java/impl/ClientSessionImpl.java | 9 +++++++++
.../client/java/impl/producer/ClientSessionHandler.java | 6 ++++++
java/pom.xml | 2 +-
4 files changed, 31 insertions(+), 1 deletion(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 4e5673cb..80e943fe 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -27,6 +27,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.Status;
@@ -128,6 +129,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
private final ReadWriteLock sessionsLock;
private final CompositedMessageInterceptor compositedMessageInterceptor;
+ private boolean receiveReconnect = false;
public ClientImpl(ClientConfiguration clientConfiguration, Set<String>
topics) {
this.clientConfiguration = checkNotNull(clientConfiguration,
"clientConfiguration should not be null");
@@ -279,6 +281,11 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return !totalRouteEndpoints.contains(endpoints);
}
+ @Override
+ public void onReconnectEndpointsCommand(Endpoints endpoints,
ReconnectEndpointsCommand command) {
+ receiveReconnect = true;
+ }
+
/**
* This method is invoked while request of printing thread stack trace is
received from remote.
*
@@ -517,6 +524,14 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return clientId;
}
+ public boolean isReceiveReconnect() {
+ return receiveReconnect;
+ }
+
+ public void setReceiveReconnect(boolean receiveReconnect) {
+ this.receiveReconnect = receiveReconnect;
+ }
+
/**
* @see Client#doHeartbeat()
*/
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 8ec03132..29dd770c 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.client.java.impl;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
@@ -156,6 +157,14 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
sessionHandler.onPrintThreadStackTraceCommand(endpoints,
printThreadStackTraceCommand);
break;
}
+ case RECONNECT_ENDPOINTS_COMMAND: {
+ final ReconnectEndpointsCommand reconnectEndpointsCommand =
+ command.getReconnectEndpointsCommand();
+ log.info("Receive reconnect endpoints command from remote,
endpoints={}, clientId={}",
+ endpoints, clientId);
+ sessionHandler.onReconnectEndpointsCommand(endpoints,
reconnectEndpointsCommand);
+ break;
+ }
default:
log.warn("Receive unrecognized command from remote,
endpoints={}, command={}, clientId={}",
endpoints, command, clientId);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 9e68692a..b6f9c559 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.client.java.impl.producer;
import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
@@ -97,4 +98,9 @@ public interface ClientSessionHandler {
* Event processor for {@link TelemetryCommand}.
*/
void onPrintThreadStackTraceCommand(Endpoints endpoints,
PrintThreadStackTraceCommand command);
+
+ /**
+ * Event processor for {@link ReconnectEndpointsCommand}.
+ */
+ void onReconnectEndpointsCommand(Endpoints endpoints,
ReconnectEndpointsCommand command);
}
diff --git a/java/pom.xml b/java/pom.xml
index c755d70b..81a4762b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -48,7 +48,7 @@
~ 1. Whether it is essential, because the current shaded jar is
fat enough.
~ 2. Make sure that it is compatible with Java 8.
-->
- <rocketmq-proto.version>2.0.4</rocketmq-proto.version>
+ <rocketmq-proto.version>2.0.5</rocketmq-proto.version>
<annotations-api.version>1.3.5</annotations-api.version>
<protobuf.version>3.24.4</protobuf.version>
<grpc.version>1.50.0</grpc.version>