This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a7d43d18 [ISSUE #9470] [Java] Support ReconnectEndpointsCommand (#1018)
a7d43d18 is described below

commit a7d43d18815e9cb666f659adfb045326bd8fc347
Author: ymwneu <[email protected]>
AuthorDate: Thu Jul 3 17:57:25 2025 +0800

    [ISSUE #9470] [Java] Support ReconnectEndpointsCommand (#1018)
---
 .../org/apache/rocketmq/client/java/impl/ClientImpl.java  | 15 +++++++++++++++
 .../rocketmq/client/java/impl/ClientSessionImpl.java      |  9 +++++++++
 .../client/java/impl/producer/ClientSessionHandler.java   |  6 ++++++
 java/pom.xml                                              |  2 +-
 4 files changed, 31 insertions(+), 1 deletion(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 4e5673cb..80e943fe 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -27,6 +27,7 @@ import apache.rocketmq.v2.NotifyClientTerminationRequest;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.QueryRouteRequest;
 import apache.rocketmq.v2.QueryRouteResponse;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.Status;
@@ -128,6 +129,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     private final ReadWriteLock sessionsLock;
 
     private final CompositedMessageInterceptor compositedMessageInterceptor;
+    private boolean receiveReconnect = false;
 
     public ClientImpl(ClientConfiguration clientConfiguration, Set<String> 
topics) {
         this.clientConfiguration = checkNotNull(clientConfiguration, 
"clientConfiguration should not be null");
@@ -279,6 +281,11 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         return !totalRouteEndpoints.contains(endpoints);
     }
 
+    @Override
+    public void onReconnectEndpointsCommand(Endpoints endpoints, 
ReconnectEndpointsCommand command) {
+        receiveReconnect = true;
+    }
+
     /**
      * This method is invoked while request of printing thread stack trace is 
received from remote.
      *
@@ -517,6 +524,14 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         return clientId;
     }
 
+    public boolean isReceiveReconnect() {
+        return receiveReconnect;
+    }
+
+    public void setReceiveReconnect(boolean receiveReconnect) {
+        this.receiveReconnect = receiveReconnect;
+    }
+
     /**
      * @see Client#doHeartbeat()
      */
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 8ec03132..29dd770c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl;
 
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
@@ -156,6 +157,14 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                     sessionHandler.onPrintThreadStackTraceCommand(endpoints, 
printThreadStackTraceCommand);
                     break;
                 }
+                case RECONNECT_ENDPOINTS_COMMAND: {
+                    final ReconnectEndpointsCommand reconnectEndpointsCommand =
+                            command.getReconnectEndpointsCommand();
+                    log.info("Receive reconnect endpoints command from remote, 
endpoints={}, clientId={}",
+                            endpoints, clientId);
+                    sessionHandler.onReconnectEndpointsCommand(endpoints, 
reconnectEndpointsCommand);
+                    break;
+                }
                 default:
                     log.warn("Receive unrecognized command from remote, 
endpoints={}, command={}, clientId={}",
                         endpoints, command, clientId);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 9e68692a..b6f9c559 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.ReconnectEndpointsCommand;
 import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
@@ -97,4 +98,9 @@ public interface ClientSessionHandler {
      * Event processor for {@link TelemetryCommand}.
      */
     void onPrintThreadStackTraceCommand(Endpoints endpoints, 
PrintThreadStackTraceCommand command);
+
+    /**
+     * Event processor for {@link ReconnectEndpointsCommand}.
+     */
+    void onReconnectEndpointsCommand(Endpoints endpoints, 
ReconnectEndpointsCommand command);
 }
diff --git a/java/pom.xml b/java/pom.xml
index c755d70b..81a4762b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -48,7 +48,7 @@
            ~  1. Whether it is essential, because the current shaded jar is 
fat enough.
            ~  2. Make sure that it is compatible with Java 8.
          -->
-        <rocketmq-proto.version>2.0.4</rocketmq-proto.version>
+        <rocketmq-proto.version>2.0.5</rocketmq-proto.version>
         <annotations-api.version>1.3.5</annotations-api.version>
         <protobuf.version>3.24.4</protobuf.version>
         <grpc.version>1.50.0</grpc.version>

Reply via email to