This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 0c837c6e [ISSUE #1055] [Golang] Enhance grpc failover resilience
(#1069)
0c837c6e is described below
commit 0c837c6e0ea273867c3b9cf7fdf4d894ee5f45e3
Author: takagi <[email protected]>
AuthorDate: Thu Sep 4 11:38:43 2025 +0800
[ISSUE #1055] [Golang] Enhance grpc failover resilience (#1069)
Co-authored-by: weilin <[email protected]>
---
golang/client.go | 28 +++++++--------
golang/client_manager.go | 10 ++++--
golang/client_manager_test.go | 4 ++-
golang/name_resolver.go | 82 +++++++++++++++++++++++++++++++++++++++++++
golang/pkg/utils/utils.go | 26 ++++++++++++++
5 files changed, 131 insertions(+), 19 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index eb7bed4f..bdc7214d 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -348,15 +348,13 @@ func (cli *defaultClient) getMessageQueues(ctx
context.Context, topic string) ([
// telemeter to all messageQueues
endpointsSet := make(map[string]bool)
for _, messageQueue := range route {
- for _, address := range
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
- target := utils.ParseAddress(address)
- if _, ok := endpointsSet[target]; ok {
- continue
- }
- endpointsSet[target] = true
- if err = cli.mustSyncSettingsToTargert(target); err !=
nil {
- return nil, err
- }
+ target :=
utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
+ if _, ok := endpointsSet[target]; ok {
+ continue
+ }
+ endpointsSet[target] = true
+ if err = cli.mustSyncSettingsToTargert(target); err != nil {
+ return nil, err
}
}
@@ -401,14 +399,12 @@ func (cli *defaultClient) getTotalTargets() []string {
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
- for _, address := range
messageQueue.GetBroker().GetEndpoints().GetAddresses() {
- target := utils.ParseAddress(address)
- if _, ok := endpointsSet[target]; ok {
- continue
- }
- endpointsSet[target] = true
- endpoints = append(endpoints, target)
+ target :=
utils.EndpointsToString(messageQueue.GetBroker().GetEndpoints())
+ if _, ok := endpointsSet[target]; ok {
+ continue
}
+ endpointsSet[target] = true
+ endpoints = append(endpoints, target)
}
return true
})
diff --git a/golang/client_manager.go b/golang/client_manager.go
index 8e9b8a1d..6c7a1b84 100644
--- a/golang/client_manager.go
+++ b/golang/client_manager.go
@@ -19,6 +19,7 @@ package golang
import (
"context"
+ "fmt"
"sync"
"time"
@@ -183,8 +184,13 @@ func (cm *defaultClientManager) cleanRpcClient() {
}
}
func (cm *defaultClientManager) getRpcClient(endpoints *v2.Endpoints)
(RpcClient, error) {
- target := utils.ParseAddress(utils.SelectAnAddress(endpoints))
-
+ var target string
+ if endpoints.GetScheme() == v2.AddressScheme_IPv4 ||
endpoints.GetScheme() == v2.AddressScheme_IPv6 {
+ serviceName := utils.EndpointsToString(endpoints)
+ target = fmt.Sprintf("%s:///%s", DefaultScheme, serviceName)
+ } else {
+ target = utils.ParseAddress(utils.SelectAnAddress(endpoints))
+ }
cm.rpcClientTableLock.RLock()
item, ok := cm.rpcClientTable[target]
cm.rpcClientTableLock.RUnlock()
diff --git a/golang/client_manager_test.go b/golang/client_manager_test.go
index 14128848..9160342c 100644
--- a/golang/client_manager_test.go
+++ b/golang/client_manager_test.go
@@ -166,11 +166,13 @@ func TestCMUnRegisterClient(t *testing.T) {
var (
fakeHost = "127.0.0.1"
fakePort int32 = 80
- fakeAddress = fmt.Sprintf("%s:%d", fakeHost, fakePort)
+ fakeScheme = "ip"
+ fakeAddress = fmt.Sprintf("%s:///%s:%d", fakeScheme, fakeHost,
fakePort)
)
func fakeEndpoints() *v2.Endpoints {
return &v2.Endpoints{
+ Scheme: v2.AddressScheme_IPv4,
Addresses: []*v2.Address{
{
Host: fakeHost,
diff --git a/golang/name_resolver.go b/golang/name_resolver.go
new file mode 100644
index 00000000..2daca316
--- /dev/null
+++ b/golang/name_resolver.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package golang
+
+import (
+ "google.golang.org/grpc/resolver"
+ "math/rand"
+ "strings"
+ "time"
+)
+
+type rocketmqResolverBuilder struct{}
+
+const (
+ DefaultScheme = "ip"
+)
+
+func (b *rocketmqResolverBuilder) Build(target resolver.Target, cc
resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
+ endpoints := target.Endpoint()
+ r := &RocketmqResolver{
+ target: target,
+ cc: cc,
+ addrsStore: map[string][]string{
+ target.Endpoint(): b.splitEndpoints(endpoints),
+ },
+ }
+ r.start()
+ return r, nil
+}
+
+func (b *rocketmqResolverBuilder) splitEndpoints(endpoints string) []string {
+ result := []string{}
+ for _, ep := range strings.Split(endpoints, ";") {
+ ep = strings.TrimSpace(ep)
+ if ep != "" {
+ result = append(result, ep)
+ }
+ }
+ rand.Seed(time.Now().UnixNano())
+ rand.Shuffle(len(result), func(i, j int) {
+ result[i], result[j] = result[j], result[i]
+ })
+ return result
+}
+
+func (b *rocketmqResolverBuilder) Scheme() string { return DefaultScheme }
+
+type RocketmqResolver struct {
+ target resolver.Target
+ cc resolver.ClientConn
+ addrsStore map[string][]string
+}
+
+func (r *RocketmqResolver) start() {
+ addrStrs := r.addrsStore[r.target.Endpoint()]
+ addrs := make([]resolver.Address, len(addrStrs))
+ for i, s := range addrStrs {
+ addrs[i] = resolver.Address{Addr: s}
+ }
+ r.cc.UpdateState(resolver.State{Addresses: addrs})
+}
+func (r *RocketmqResolver) ResolveNow(resolver.ResolveNowOptions) {}
+func (r *RocketmqResolver) Close() {}
+
+func init() {
+ resolver.Register(&rocketmqResolverBuilder{})
+}
diff --git a/golang/pkg/utils/utils.go b/golang/pkg/utils/utils.go
index 7ba7d006..f9f29baa 100644
--- a/golang/pkg/utils/utils.go
+++ b/golang/pkg/utils/utils.go
@@ -30,6 +30,7 @@ import (
"net/url"
"os"
"runtime"
+ "sort"
"strconv"
"strings"
"sync"
@@ -87,6 +88,9 @@ func ParseAddress(address *v2.Address) string {
}
func ParseTarget(target string) (*v2.Endpoints, error) {
+ if strings.HasPrefix(target, "ip:///") {
+ target = strings.TrimPrefix(target, "ip:///")
+ }
ret := &v2.Endpoints{
Scheme: v2.AddressScheme_DOMAIN_NAME,
}
@@ -233,6 +237,28 @@ func GenClientID() string {
nanotime := time.Now().UnixNano() / 1000
return fmt.Sprintf("%s@%d@%d@%s", hostName, processID, nextIdx,
strconv.FormatInt(nanotime, 36))
}
+func EndpointsToString(endpoints *v2.Endpoints) string {
+ if endpoints == nil {
+ return ""
+ }
+ var sb strings.Builder
+ addresses := endpoints.GetAddresses()
+ sort.Slice(addresses, func(i, j int) bool {
+ ip1 := net.ParseIP(addresses[i].Host).String()
+ ip2 := net.ParseIP(addresses[j].Host).String()
+ if ip1 == ip2 {
+ return addresses[i].Port < addresses[j].Port
+ }
+ return ip1 < ip2
+ })
+ for i, addr := range addresses {
+ sb.WriteString(fmt.Sprintf("%s:%d", addr.Host, addr.Port))
+ if i != len(addresses)-1 {
+ sb.WriteString(";")
+ }
+ }
+ return sb.String()
+}
func SelectAnAddress(endpoints *v2.Endpoints) *v2.Address {
if endpoints == nil {