This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c837c6e [ISSUE #1055] [Golang] Enhance grpc failover resilience 
(#1069)
0c837c6e is described below

commit 0c837c6e0ea273867c3b9cf7fdf4d894ee5f45e3
Author: takagi <[email protected]>
AuthorDate: Thu Sep 4 11:38:43 2025 +0800

    [ISSUE #1055] [Golang] Enhance grpc failover resilience (#1069)
    
    Co-authored-by: weilin <[email protected]>
---
 golang/client.go              | 28 +++++++--------
 golang/client_manager.go      | 10 ++++--
 golang/client_manager_test.go |  4 ++-
 golang/name_resolver.go       | 82 +++++++++++++++++++++++++++++++++++++++++++
 golang/pkg/utils/utils.go     | 26 ++++++++++++++
 5 files changed, 131 insertions(+), 19 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index eb7bed4f..bdc7214d 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -348,15 +348,13 @@ func (cli *defaultClient) getMessageQueues(ctx 
context.Context, topic string) ([
        // telemeter to all messageQueues
        endpointsSet := make(map[string]bool)
        for _, messageQueue := range route {
-               for _, address := range 
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
-                       target := utils.ParseAddress(address)
-                       if _, ok := endpointsSet[target]; ok {
-                               continue
-                       }
-                       endpointsSet[target] = true
-                       if err = cli.mustSyncSettingsToTargert(target); err != 
nil {
-                               return nil, err
-                       }
+               target := 
utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
+               if _, ok := endpointsSet[target]; ok {
+                       continue
+               }
+               endpointsSet[target] = true
+               if err = cli.mustSyncSettingsToTargert(target); err != nil {
+                       return nil, err
                }
        }
 
@@ -401,14 +399,12 @@ func (cli *defaultClient) getTotalTargets() []string {
        cli.router.Range(func(_, v interface{}) bool {
                messageQueues := v.([]*v2.MessageQueue)
                for _, messageQueue := range messageQueues {
-                       for _, address := range 
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
-                               target := utils.ParseAddress(address)
-                               if _, ok := endpointsSet[target]; ok {
-                                       continue
-                               }
-                               endpointsSet[target] = true
-                               endpoints = append(endpoints, target)
+                       target := 
utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
+                       if _, ok := endpointsSet[target]; ok {
+                               continue
                        }
+                       endpointsSet[target] = true
+                       endpoints = append(endpoints, target)
                }
                return true
        })
diff --git a/golang/client_manager.go b/golang/client_manager.go
index 8e9b8a1d..6c7a1b84 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -19,6 +19,7 @@ package golang
 
 import (
        "context"
+       "fmt"
        "sync"
        "time"
 
@@ -183,8 +184,13 @@ func (cm *defaultClientManager) cleanRpcClient() {
        }
 }
 func (cm *defaultClientManager) getRpcClient(endpoints *v2.Endpoints) 
(RpcClient, error) {
-       target := utils.ParseAddress(utils.SelectAnAddress(endpoints))
-
+       var target string
+       if endpoints.GetScheme() == v2.AddressScheme_IPv4 || 
endpoints.GetScheme() == v2.AddressScheme_IPv6 {
+               serviceName := utils.EndpointsToString(endpoints)
+               target = fmt.Sprintf("%s:///%s", DefaultScheme, serviceName)
+       } else {
+               target = utils.ParseAddress(utils.SelectAnAddress(endpoints))
+       }
        cm.rpcClientTableLock.RLock()
        item, ok := cm.rpcClientTable[target]
        cm.rpcClientTableLock.RUnlock()
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 14128848..9160342c 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -166,11 +166,13 @@ func TestCMUnRegisterClient(t *testing.T) {
 var (
        fakeHost          = "127.0.0.1"
        fakePort    int32 = 80
-       fakeAddress       = fmt.Sprintf("%s:%d", fakeHost, fakePort)
+       fakeScheme        = "ip"
+       fakeAddress       = fmt.Sprintf("%s:///%s:%d", fakeScheme, fakeHost, 
fakePort)
 )
 
 func fakeEndpoints() *v2.Endpoints {
        return &v2.Endpoints{
+               Scheme: v2.AddressScheme_IPv4,
                Addresses: []*v2.Address{
                        {
                                Host: fakeHost,
diff --git a/golang/name_resolver.go b/golang/name_resolver.go
new file mode 100644
index 00000000..2daca316
--- /dev/null
+++ b/golang/name_resolver.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+       "google.golang.org/grpc/resolver"
+       "math/rand"
+       "strings"
+       "time"
+)
+
+type rocketmqResolverBuilder struct{}
+
+const (
+       DefaultScheme = "ip"
+)
+
+func (b *rocketmqResolverBuilder) Build(target resolver.Target, cc 
resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
+       endpoints := target.Endpoint()
+       r := &RocketmqResolver{
+               target: target,
+               cc:     cc,
+               addrsStore: map[string][]string{
+                       target.Endpoint(): b.splitEndpoints(endpoints),
+               },
+       }
+       r.start()
+       return r, nil
+}
+
+func (b *rocketmqResolverBuilder) splitEndpoints(endpoints string) []string {
+       result := []string{}
+       for _, ep := range strings.Split(endpoints, ";") {
+               ep = strings.TrimSpace(ep)
+               if ep != "" {
+                       result = append(result, ep)
+               }
+       }
+       rand.Seed(time.Now().UnixNano())
+       rand.Shuffle(len(result), func(i, j int) {
+               result[i], result[j] = result[j], result[i]
+       })
+       return result
+}
+
+func (b *rocketmqResolverBuilder) Scheme() string { return DefaultScheme }
+
+type RocketmqResolver struct {
+       target     resolver.Target
+       cc         resolver.ClientConn
+       addrsStore map[string][]string
+}
+
+func (r *RocketmqResolver) start() {
+       addrStrs := r.addrsStore[r.target.Endpoint()]
+       addrs := make([]resolver.Address, len(addrStrs))
+       for i, s := range addrStrs {
+               addrs[i] = resolver.Address{Addr: s}
+       }
+       r.cc.UpdateState(resolver.State{Addresses: addrs})
+}
+func (r *RocketmqResolver) ResolveNow(resolver.ResolveNowOptions) {}
+func (r *RocketmqResolver) Close()                                {}
+
+func init() {
+       resolver.Register(&rocketmqResolverBuilder{})
+}
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 7ba7d006..f9f29baa 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -30,6 +30,7 @@ import (
        "net/url"
        "os"
        "runtime"
+       "sort"
        "strconv"
        "strings"
        "sync"
@@ -87,6 +88,9 @@ func ParseAddress(address *v2.Address) string {
 }
 
 func ParseTarget(target string) (*v2.Endpoints, error) {
+       if strings.HasPrefix(target, "ip:///") {
+               target = strings.TrimPrefix(target, "ip:///")
+       }
        ret := &v2.Endpoints{
                Scheme: v2.AddressScheme_DOMAIN_NAME,
        }
@@ -233,6 +237,28 @@ func GenClientID() string {
        nanotime := time.Now().UnixNano() / 1000
        return fmt.Sprintf("%s@%d@%d@%s", hostName, processID, nextIdx, 
strconv.FormatInt(nanotime, 36))
 }
+func EndpointsToString(endpoints *v2.Endpoints) string {
+       if endpoints == nil {
+               return ""
+       }
+       var sb strings.Builder
+       addresses := endpoints.GetAddresses()
+       sort.Slice(addresses, func(i, j int) bool {
+               ip1 := net.ParseIP(addresses[i].Host).String()
+               ip2 := net.ParseIP(addresses[j].Host).String()
+               if ip1 == ip2 {
+                       return addresses[i].Port < addresses[j].Port
+               }
+               return ip1 < ip2
+       })
+       for i, addr := range addresses {
+               sb.WriteString(fmt.Sprintf("%s:%d", addr.Host, addr.Port))
+               if i != len(addresses)-1 {
+                       sb.WriteString(";")
+               }
+       }
+       return sb.String()
+}
 
 func SelectAnAddress(endpoints *v2.Endpoints) *v2.Address {
        if endpoints == nil {

Reply via email to