This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new d0db3403 improve: service URL parsing compatible with Go 1.26 (#1468)
d0db3403 is described below
commit d0db3403023d3390392db8562741691482af6df4
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Mar 21 12:35:00 2026 +0800
improve: service URL parsing compatible with Go 1.26 (#1468)
* improve: service URL parsing compatible with Go 1.26
* ci: add Go 1.26 support
* refactor: remove ResolveHostURI method and update tests for service URL
resolution
* improve: enhance service URL parsing and add tests for unsupported schemes
* Update pulsar/internal/service_uri.go
Co-authored-by: Copilot <[email protected]>
* improve: simplify service URL parsing by removing unnecessary URI
conversion
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: crossoverJie <[email protected]>
---
.github/workflows/ci.yml | 8 +-
Dockerfile | 2 +-
Makefile | 2 +-
README.md | 4 +-
pulsar/client_impl.go | 19 +-
pulsar/client_impl_test.go | 6 +
pulsar/internal/http_client.go | 4 +-
pulsar/internal/lookup_service.go | 8 +-
pulsar/internal/lookup_service_test.go | 86 ++++----
pulsar/internal/rpc_client.go | 34 ++-
pulsar/internal/rpc_client_test.go | 10 +-
pulsar/internal/service_name_resolver.go | 36 +---
pulsar/internal/service_name_resolver_test.go | 82 ++++----
pulsar/internal/service_uri.go | 286 +++++++++++++++++---------
pulsar/internal/service_uri_test.go | 59 +++++-
15 files changed, 368 insertions(+), 278 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f6634eaf..30c18fe3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- go-version: [ '1.23', '1.24' ]
+ go-version: [ '1.24', '1.26']
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v5
@@ -36,7 +36,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v5
with:
- go-version: '1.23'
+ go-version: '1.24'
- name: Check license header
run: docker run --rm -v $(pwd):/github/workspace
ghcr.io/korandoru/hawkeye-native:v3 check
- name: Run golangci-lint
@@ -48,7 +48,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- go-version: [ '1.23', '1.24' ]
+ go-version: [ '1.24', '1.26' ]
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
@@ -57,6 +57,8 @@ jobs:
- name: Run Tests
id: run-tests
run: make test GO_VERSION=${{ matrix.go-version }}
+ env:
+ GODEBUG: urlstrictcolons=1
- name: Upload logs on failure
if: failure() && steps.run-tests.outcome == 'failure'
uses: actions/upload-artifact@v4
diff --git a/Dockerfile b/Dockerfile
index 8716f737..44a10042 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,7 +19,7 @@
# set via the Makefile or CLI
ARG PULSAR_IMAGE=apachepulsar/pulsar:latest
-ARG GO_VERSION=1.23
+ARG GO_VERSION=1.24
FROM golang:$GO_VERSION as golang
FROM $PULSAR_IMAGE
diff --git a/Makefile b/Makefile
index bee9902a..7cdf21fd 100644
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,7 @@
IMAGE_NAME = pulsar-client-go-test:latest
PULSAR_VERSION ?= latest
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
-GO_VERSION ?= 1.23
+GO_VERSION ?= 1.24
CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/)
# Golang standard bin directory.
diff --git a/README.md b/README.md
index c5e6d1c1..a3000b29 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@ CGo-based library.
## Requirements
-- Go 1.23+
+- Go 1.24+
## Status
@@ -148,7 +148,7 @@ Run the tests:
Run the tests with specific versions of GOLANG and PULSAR:
- make test GO_VERSION=1.23 PULSAR_VERSION=4.0.3
+ make test GO_VERSION=1.24 PULSAR_VERSION=4.0.3
## Contributing
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index d940367f..f1cfeb04 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -19,7 +19,6 @@ package pulsar
import (
"fmt"
- "net/url"
"sync"
"time"
@@ -80,31 +79,31 @@ func newClient(options ClientOptions) (Client, error) {
return nil, newError(InvalidConfiguration, "URL is required for
client")
}
- url, err := url.Parse(options.URL)
+ pulsarServiceURI, err :=
internal.NewPulsarServiceURIFromURIString(options.URL)
if err != nil {
logger.WithError(err).Error("Failed to parse service URL")
return nil, newError(InvalidConfiguration, "Invalid service
URL")
}
var tlsConfig *internal.TLSOptions
- switch url.Scheme {
- case "pulsar", "http":
- tlsConfig = nil
- case "pulsar+ssl", "https":
+ if pulsarServiceURI.UseTLS() {
+ hostName, err := pulsarServiceURI.PrimaryHostName()
+ if err != nil {
+ return nil, err
+ }
+
tlsConfig = &internal.TLSOptions{
AllowInsecureConnection:
options.TLSAllowInsecureConnection,
KeyFile: options.TLSKeyFilePath,
CertFile: options.TLSCertificateFile,
TrustCertsFilePath: options.TLSTrustCertsFilePath,
ValidateHostname: options.TLSValidateHostname,
- ServerName: url.Hostname(),
+ ServerName: hostName,
CipherSuites: options.TLSCipherSuites,
MinVersion: options.TLSMinVersion,
MaxVersion: options.TLSMaxVersion,
TLSConfig: options.TLSConfig,
}
- default:
- return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid
URL scheme '%s'", url.Scheme))
}
var authProvider auth.Provider
@@ -175,7 +174,7 @@ func newClient(options ClientOptions) (Client, error) {
tlsEnabled: tlsConfig != nil,
}
- c.rpcClient, err = internal.NewRPCClient(url, c.cnxPool,
operationTimeout, logger, metrics,
+ c.rpcClient, err = internal.NewRPCClient(options.URL, c.cnxPool,
operationTimeout, logger, metrics,
options.ListenerName, tlsConfig, authProvider,
toKeyValues(options.LookupProperties))
if err != nil {
return nil, err
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 7a8c8e60..30754044 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -46,6 +46,12 @@ func TestClient(t *testing.T) {
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
}
+func TestClientInvalidScheme(t *testing.T) {
+ client, err := NewClient(ClientOptions{URL: "ftp://localhost:21"})
+ require.Error(t, err)
+ assert.Nil(t, client)
+}
+
func TestTLSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURLTLS,
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index b7dea1fe..38912206 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -59,13 +59,13 @@ type HTTPClient interface {
Closable
}
-func NewHTTPClient(serviceURL *url.URL, serviceNameResolver
ServiceNameResolver, tlsConfig *TLSOptions,
+func NewHTTPClient(serviceNameResolver ServiceNameResolver, tlsConfig
*TLSOptions,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
authProvider auth.Provider) (HTTPClient, error) {
h := &httpClient{
ServiceNameResolver: serviceNameResolver,
requestTimeout: requestTimeout,
- log: logger.SubLogger(log.Fields{"serviceURL":
serviceURL}),
+ log: logger.SubLogger(log.Fields{"serviceURL":
serviceNameResolver.GetServiceURI().URL}),
metrics: metrics,
}
c := &http.Client{Timeout: requestTimeout}
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 19e51b41..876a1021 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -92,14 +92,14 @@ type lookupService struct {
}
// NewLookupService init a lookup service struct and return an object of
LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
serviceNameResolver ServiceNameResolver,
+func NewLookupService(rpcClient RPCClient, serviceNameResolver
ServiceNameResolver,
tlsEnabled bool, listenerName string,
lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics)
LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
- log: logger.SubLogger(log.Fields{"serviceURL":
serviceURL}),
+ log: logger.SubLogger(log.Fields{"serviceURL":
serviceNameResolver.GetServiceURI().URL}),
lookupProperties: lookupProperties,
metrics: metrics,
listenerName: listenerName,
@@ -446,14 +446,14 @@ func (h *httpLookupService) Close() {
}
// NewHTTPLookupService init a http based lookup service struct and return an
object of LookupService.
-func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL,
serviceNameResolver ServiceNameResolver,
+func NewHTTPLookupService(httpClient HTTPClient, serviceNameResolver
ServiceNameResolver,
tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
return &httpLookupService{
httpClient: httpClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
- log: logger.SubLogger(log.Fields{"serviceURL":
serviceURL}),
+ log: logger.SubLogger(log.Fields{"serviceURL":
serviceNameResolver.GetServiceURI().URL}),
metrics: metrics,
}
}
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index b8467137..e12eec4b 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -131,9 +132,8 @@ func responseType(r
pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT
}
func TestLookupSuccess(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -157,7 +157,7 @@ func TestLookupSuccess(t *testing.T) {
},
}
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, serviceNameResolver, false,
"", kvs, log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, false, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -168,9 +168,8 @@ func TestLookupSuccess(t *testing.T) {
}
func TestTlsLookupSuccess(t *testing.T) {
- url, err := url.Parse("pulsar+ssl://example:6651")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -195,7 +194,7 @@ func TestTlsLookupSuccess(t *testing.T) {
}
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, serviceNameResolver, true,
"", kvs, log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, true, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -206,9 +205,8 @@ func TestTlsLookupSuccess(t *testing.T) {
}
func TestLookupWithProxy(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -233,7 +231,7 @@ func TestLookupWithProxy(t *testing.T) {
},
}
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, serviceNameResolver, false,
"", kvs, log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, false, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -244,8 +242,8 @@ func TestLookupWithProxy(t *testing.T) {
}
func TestTlsLookupWithProxy(t *testing.T) {
- url, err := url.Parse("pulsar+ssl://example:6651")
- assert.NoError(t, err)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -269,9 +267,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
},
},
}
- resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, resolver, true, "", kvs,
log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, true, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -282,8 +279,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
}
func TestLookupWithRedirect(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -319,9 +316,8 @@ func TestLookupWithRedirect(t *testing.T) {
},
},
}
- resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, resolver, false, "", kvs,
log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, false, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -332,8 +328,8 @@ func TestLookupWithRedirect(t *testing.T) {
}
func TestTlsLookupWithRedirect(t *testing.T) {
- url, err := url.Parse("pulsar+ssl://example:6651")
- assert.NoError(t, err)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -370,9 +366,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
},
}
- resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, resolver, true, "", kvs,
log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, true, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -383,8 +378,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
}
func TestLookupWithInvalidUrlResponse(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -408,9 +403,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
},
},
}
- resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, resolver, false, "", kvs,
log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, false, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -418,8 +412,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
}
func TestLookupWithLookupFailure(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
@@ -442,9 +436,8 @@ func TestLookupWithLookupFailure(t *testing.T) {
},
}
- resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer)
- ls := NewLookupService(mockedClient, url, resolver, false, "", kvs,
log.DefaultNopLogger(), metricsProvider)
+ ls := NewLookupService(mockedClient, serviceNameResolver, false, "",
kvs, log.DefaultNopLogger(), metricsProvider)
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -526,9 +519,8 @@ func (m *mockedPartitionedTopicMetadataRPCClient)
LookupService(_ string) (Looku
}
func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
- url, err := url.Parse("pulsar://example:6650")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://example:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
@@ -547,7 +539,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response:
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
- }, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
+ }, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer))
metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
@@ -557,10 +549,8 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
}
func TestLookupSuccessWithMultipleHosts(t *testing.T) {
- url, err := url.Parse("pulsar://host1,host2,host3:6650")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
-
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("pulsar://host1:6650,host2:6650,host3:6650")
+ require.NoError(t, err)
kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,
@@ -582,7 +572,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
+ }, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{},
prometheus.DefaultRegisterer))
lr, err := ls.Lookup("my-topic")
@@ -639,11 +629,10 @@ func NewMockHTTPClient(serviceNameResolver
ServiceNameResolver) HTTPClient {
}
func TestHttpLookupSuccess(t *testing.T) {
- url, err := url.Parse("http://broker-1:8080")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("http://broker-1:8080")
+ require.NoError(t, err)
httpClient := NewMockHTTPClient(serviceNameResolver)
- ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+ ls := NewHTTPLookupService(httpClient, serviceNameResolver, false,
log.DefaultNopLogger(), NewMetricsProvider(4,
map[string]string{}, prometheus.DefaultRegisterer))
lr, err := ls.Lookup("my-topic")
@@ -655,11 +644,10 @@ func TestHttpLookupSuccess(t *testing.T) {
}
func TestHttpGetPartitionedTopicMetadataSuccess(t *testing.T) {
- url, err := url.Parse("http://broker-1:8080")
- assert.NoError(t, err)
- serviceNameResolver := NewPulsarServiceNameResolver(url)
+ serviceNameResolver, err :=
NewPulsarServiceNameResolver("http://broker-1:8080")
+ require.NoError(t, err)
httpClient := NewMockHTTPClient(serviceNameResolver)
- ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+ ls := NewHTTPLookupService(httpClient, serviceNameResolver, false,
log.DefaultNopLogger(), NewMetricsProvider(4,
map[string]string{}, prometheus.DefaultRegisterer))
tMetadata, err := ls.GetPartitionedTopicMetadata("my-topic")
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 68cf1347..85e048ef 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -94,7 +94,7 @@ type rpcClient struct {
lookupProperties []*pb.KeyValue
}
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+func NewRPCClient(serviceURL string, pool ConnectionPool,
requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider,
lookupProperties []*pb.KeyValue) (RPCClient, error) {
@@ -109,7 +109,7 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
urlLookupServiceMap: make(map[string]LookupService),
lookupProperties: lookupProperties,
}
- lookupService, err := c.NewLookupService(serviceURL)
+ lookupService, err := c.LookupService(serviceURL)
if err != nil {
return nil, fmt.Errorf("failed to create lookup service: %w",
err)
}
@@ -232,12 +232,7 @@ func (c *rpcClient) LookupService(URL string)
(LookupService, error) {
return lookupService, nil
}
- serviceURL, err := url.Parse(URL)
- if err != nil {
- return nil, fmt.Errorf("failed to parse URL '%s': %w", URL, err)
- }
-
- lookupService, err = c.NewLookupService(serviceURL)
+ lookupService, err := c.newLookupService(URL)
if err != nil {
return nil, fmt.Errorf("failed to create lookup service for URL
'%s': %w", URL, err)
}
@@ -245,26 +240,25 @@ func (c *rpcClient) LookupService(URL string)
(LookupService, error) {
return lookupService, nil
}
-func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
+func (c *rpcClient) newLookupService(serviceURL string) (LookupService, error)
{
+ serviceNameResolver, err := NewPulsarServiceNameResolver(serviceURL)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create service name resolver
for URL '%s': %w", serviceURL, err)
+ }
- switch url.Scheme {
- case "pulsar", "pulsar+ssl":
- serviceNameResolver := NewPulsarServiceNameResolver(url)
- return NewLookupService(c, url, serviceNameResolver,
- c.tlsConfig != nil, c.listenerName, c.lookupProperties,
c.log, c.metrics), nil
- case "http", "https":
- serviceNameResolver := NewPulsarServiceNameResolver(url)
- httpClient, err := NewHTTPClient(url, serviceNameResolver,
c.tlsConfig,
+ if serviceNameResolver.GetServiceURI().IsHTTP() {
+ httpClient, err := NewHTTPClient(serviceNameResolver,
c.tlsConfig,
c.requestTimeout, c.log, c.metrics, c.authProvider)
if err != nil {
return nil, err
}
return NewHTTPLookupService(
- httpClient, url, serviceNameResolver, c.tlsConfig !=
nil, c.log, c.metrics), nil
- default:
- return nil, fmt.Errorf("invalid URL scheme '%s'", url.Scheme)
+ httpClient, serviceNameResolver, c.tlsConfig != nil,
c.log, c.metrics), nil
}
+
+ return NewLookupService(c, serviceNameResolver,
+ c.tlsConfig != nil, c.listenerName, c.lookupProperties, c.log,
c.metrics), nil
}
func (c *rpcClient) Close() {
diff --git a/pulsar/internal/rpc_client_test.go
b/pulsar/internal/rpc_client_test.go
index 08a2013e..e9623d5a 100644
--- a/pulsar/internal/rpc_client_test.go
+++ b/pulsar/internal/rpc_client_test.go
@@ -18,7 +18,6 @@
package internal
import (
- "net/url"
"testing"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -27,34 +26,31 @@ import (
func TestNewRPCClient_InvalidURL_ShouldNotPanic(t *testing.T) {
// Test that NewRPCClient doesn't panic with invalid URL
- invalidURL, _ := url.Parse("invalid://scheme")
+ invalidURL := "invalid://scheme"
// This should not panic and should return an error
_, err := NewRPCClient(invalidURL, nil, 0, log.DefaultNopLogger(), nil,
"", nil, nil, nil)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "invalid URL scheme")
}
func TestLookupService_InvalidURL_ShouldNotPanic(t *testing.T) {
// Create a minimal RPC client for testing
- validURL, _ := url.Parse("pulsar://localhost:6650")
+ validURL := "pulsar://localhost:6650"
rpcClient, err := NewRPCClient(validURL, nil, 0,
log.DefaultNopLogger(), nil, "", nil, nil, nil)
assert.NoError(t, err)
// Test that LookupService doesn't panic with invalid URL
_, err = rpcClient.LookupService("invalid://url")
assert.Error(t, err)
- assert.Contains(t, err.Error(), "invalid URL scheme")
}
func TestLookupService_InvalidScheme_ShouldNotPanic(t *testing.T) {
// Create a minimal RPC client for testing
- validURL, _ := url.Parse("pulsar://localhost:6650")
+ validURL := "pulsar://localhost:6650"
rpcClient, err := NewRPCClient(validURL, nil, 0,
log.DefaultNopLogger(), nil, "", nil, nil, nil)
assert.NoError(t, err)
// Test that LookupService doesn't panic with invalid scheme
_, err = rpcClient.LookupService("ftp://localhost:21")
assert.Error(t, err)
- assert.Contains(t, err.Error(), "invalid URL scheme")
}
diff --git a/pulsar/internal/service_name_resolver.go
b/pulsar/internal/service_name_resolver.go
index a8fabb22..29a57edc 100644
--- a/pulsar/internal/service_name_resolver.go
+++ b/pulsar/internal/service_name_resolver.go
@@ -30,16 +30,13 @@ import (
type ServiceNameResolver interface {
ResolveHost() (*url.URL, error)
- ResolveHostURI() (*PulsarServiceURI, error)
- UpdateServiceURL(url *url.URL) error
+ UpdateServiceURL(serviceURL string) error
GetServiceURI() *PulsarServiceURI
- GetServiceURL() *url.URL
GetAddressList() []*url.URL
}
type pulsarServiceNameResolver struct {
ServiceURI *PulsarServiceURI
- ServiceURL *url.URL
CurrentIndex int32
AddressList []*url.URL
@@ -47,13 +44,12 @@ type pulsarServiceNameResolver struct {
mutex sync.Mutex
}
-func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
+func NewPulsarServiceNameResolver(serviceURL string) (ServiceNameResolver,
error) {
r := &pulsarServiceNameResolver{rnd:
rand.New(rand.NewSource(time.Now().UnixNano()))}
- err := r.UpdateServiceURL(url)
- if err != nil {
- log.Errorf("create pulsar service name resolver failed : %v",
err)
+ if len(serviceURL) > 0 {
+ return r, r.UpdateServiceURL(serviceURL)
}
- return r
+ return r, nil
}
func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
@@ -64,7 +60,7 @@ func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL,
error) {
return nil, errors.New("no service url is provided yet")
}
if len(r.AddressList) == 0 {
- return nil, fmt.Errorf("no hosts found for service url : %v",
r.ServiceURL)
+ return nil, fmt.Errorf("no hosts found for service url : %v",
r.ServiceURI)
}
if len(r.AddressList) == 1 {
return r.AddressList[0], nil
@@ -74,19 +70,10 @@ func (r *pulsarServiceNameResolver) ResolveHost()
(*url.URL, error) {
return r.AddressList[idx], nil
}
-func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI,
error) {
- host, err := r.ResolveHost()
+func (r *pulsarServiceNameResolver) UpdateServiceURL(serviceURL string) error {
+ uri, err := NewPulsarServiceURIFromURIString(serviceURL)
if err != nil {
- return nil, err
- }
- hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port()
- return NewPulsarServiceURIFromURIString(hostURL)
-}
-
-func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error {
- uri, err := NewPulsarServiceURIFromURL(u)
- if err != nil {
- log.Errorf("invalid service-url instance %s provided %v", u,
err)
+ log.Errorf("invalid service-url instance %s provided %v",
serviceURL, err)
return err
}
@@ -106,7 +93,6 @@ func (r *pulsarServiceNameResolver) UpdateServiceURL(u
*url.URL) error {
defer r.mutex.Unlock()
r.AddressList = addresses
- r.ServiceURL = u
r.ServiceURI = uri
r.CurrentIndex = int32(r.rnd.Intn(len(addresses)))
return nil
@@ -116,10 +102,6 @@ func (r *pulsarServiceNameResolver) GetServiceURI()
*PulsarServiceURI {
return r.ServiceURI
}
-func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL {
- return r.ServiceURL
-}
-
func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL {
return r.AddressList
}
diff --git a/pulsar/internal/service_name_resolver_test.go
b/pulsar/internal/service_name_resolver_test.go
index e1906339..51061c6a 100644
--- a/pulsar/internal/service_name_resolver_test.go
+++ b/pulsar/internal/service_name_resolver_test.go
@@ -22,40 +22,33 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
u, err := resolver.ResolveHost()
assert.Nil(t, u)
assert.NotNil(t, err)
assert.EqualError(t, err, "no service url is provided yet")
}
-func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
- u, err := resolver.ResolveHostURI()
- assert.Nil(t, u)
- assert.NotNil(t, err)
- assert.EqualError(t, err, "no service url is provided yet")
-}
-
func TestUpdateInvalidServiceUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
- url, _ := url.Parse("pulsar:///")
- err := resolver.UpdateServiceURL(url)
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
+ err = resolver.UpdateServiceURL("pulsar:///")
assert.NotNil(t, err)
- assert.Empty(t, resolver.GetServiceURL())
assert.Nil(t, resolver.GetServiceURI())
}
func TestSimpleHostUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
- serviceURL, _ := url.Parse("pulsar://host1:6650")
- err := resolver.UpdateServiceURL(serviceURL)
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
+ serviceURL := "pulsar://host1:6650"
+ err = resolver.UpdateServiceURL(serviceURL)
assert.Nil(t, err)
- assert.Equal(t, serviceURL, resolver.GetServiceURL())
- expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
assert.Nil(t, err)
assert.Equal(t, expectedURI, resolver.GetServiceURI())
actualHost, err := resolver.ResolveHost()
@@ -63,11 +56,10 @@ func TestSimpleHostUrl(t *testing.T) {
assert.Equal(t, "host1", actualHost.Hostname())
assert.Equal(t, "6650", actualHost.Port())
- newServiceURL, _ := url.Parse("pulsar://host2:6650")
+ newServiceURL := "pulsar://host2:6650"
err = resolver.UpdateServiceURL(newServiceURL)
assert.Nil(t, err)
- assert.Equal(t, newServiceURL, resolver.GetServiceURL())
- expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL)
+ expectedURI, err = NewPulsarServiceURIFromURIString(newServiceURL)
assert.Nil(t, err)
assert.Equal(t, expectedURI, resolver.GetServiceURI())
actualHost, err = resolver.ResolveHost()
@@ -77,55 +69,59 @@ func TestSimpleHostUrl(t *testing.T) {
}
func TestMultipleHostsUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
- serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650")
- err := resolver.UpdateServiceURL(serviceURL)
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
+ serviceURL := "pulsar://host1:6650,host2:6650"
+ err = resolver.UpdateServiceURL(serviceURL)
assert.Nil(t, err)
- assert.Equal(t, serviceURL, resolver.GetServiceURL())
- expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
assert.Nil(t, err)
assert.Equal(t, expectedURI, resolver.GetServiceURI())
host1, _ := url.Parse("pulsar://host1:6650")
host2, _ := url.Parse("pulsar://host2:6650")
- host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650")
- host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650")
assert.Contains(t, resolver.GetAddressList(), host1)
assert.Contains(t, resolver.GetAddressList(), host2)
hosts := []*url.URL{host1, host2}
- hosturis := []*PulsarServiceURI{host1uri, host2uri}
for i := 0; i < 10; i++ {
host, err := resolver.ResolveHost()
assert.Nil(t, err)
- hosturi, err := resolver.ResolveHostURI()
- assert.Nil(t, err)
assert.Contains(t, hosts, host)
- assert.Contains(t, hosturis, hosturi)
}
}
func TestMultipleHostsTlsUrl(t *testing.T) {
- resolver := NewPulsarServiceNameResolver(nil)
- serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651")
- err := resolver.UpdateServiceURL(serviceURL)
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
+ serviceURL := "pulsar+ssl://host1:6651,host2:6651"
+ err = resolver.UpdateServiceURL(serviceURL)
assert.Nil(t, err)
- assert.Equal(t, serviceURL, resolver.GetServiceURL())
- expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+ expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
assert.Nil(t, err)
assert.Equal(t, expectedURI, resolver.GetServiceURI())
host1, _ := url.Parse("pulsar+ssl://host1:6651")
host2, _ := url.Parse("pulsar+ssl://host2:6651")
- host1uri, _ :=
NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651")
- host2uri, _ :=
NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651")
assert.Contains(t, resolver.GetAddressList(), host1)
assert.Contains(t, resolver.GetAddressList(), host2)
hosts := []*url.URL{host1, host2}
- hosturis := []*PulsarServiceURI{host1uri, host2uri}
for i := 0; i < 10; i++ {
host, err := resolver.ResolveHost()
assert.Nil(t, err)
- hosturi, err := resolver.ResolveHostURI()
- assert.Nil(t, err)
assert.Contains(t, hosts, host)
- assert.Contains(t, hosturis, hosturi)
}
}
+
+func TestResolveIpv6Host(t *testing.T) {
+ resolver, err := NewPulsarServiceNameResolver("")
+ require.NoError(t, err)
+
+ serviceURL := "pulsar://[fec0:0:0:ffff::1]:6650"
+ err = resolver.UpdateServiceURL(serviceURL)
+ require.NoError(t, err)
+
+ actualHost, err := resolver.ResolveHost()
+ require.NoError(t, err)
+ assert.Equal(t, "pulsar", actualHost.Scheme)
+ assert.Equal(t, "fec0:0:0:ffff::1", actualHost.Hostname())
+ assert.Equal(t, "6650", actualHost.Port())
+ assert.Equal(t, "[fec0:0:0:ffff::1]:6650", actualHost.Host)
+}
diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go
index 7b753398..eccd22ad 100644
--- a/pulsar/internal/service_uri.go
+++ b/pulsar/internal/service_uri.go
@@ -21,7 +21,10 @@ import (
"errors"
"fmt"
"net"
+ "net/netip"
"net/url"
+ "slices"
+ "strconv"
"strings"
log "github.com/sirupsen/logrus"
@@ -46,6 +49,14 @@ type PulsarServiceURI struct {
URL *url.URL
}
+type UnsupportedServiceNameError struct {
+ ServiceName string
+}
+
+func (e *UnsupportedServiceNameError) Error() string {
+ return fmt.Sprintf("unsupported service name: %s", e.ServiceName)
+}
+
func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
u, err := fromString(uri)
if err != nil {
@@ -55,109 +66,86 @@ func NewPulsarServiceURIFromURIString(uri string)
(*PulsarServiceURI, error) {
return u, nil
}
-func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
- u, err := fromURL(url)
- if err != nil {
- log.Error(err)
- return nil, err
+func (p *PulsarServiceURI) UseTLS() bool {
+ return p.ServiceName == HTTPSService || slices.Contains(p.ServiceInfos,
SSLService)
+}
+
+func (p *PulsarServiceURI) PrimaryHostName() (string, error) {
+ if len(p.ServiceHosts) > 0 {
+ host, _, err := net.SplitHostPort(p.ServiceHosts[0])
+ if err != nil {
+ return "", err
+ }
+ return host, nil
}
- return u, nil
+
+ return "", errors.New("no hosts available in ServiceHosts")
}
-func fromString(uriStr string) (*PulsarServiceURI, error) {
- if uriStr == "" || len(uriStr) == 0 {
- return nil, errors.New("service uriStr string is null")
- }
- if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
- // deal with ipv6 address
- hosts := strings.FieldsFunc(uriStr, splitURI)
- if len(hosts) > 1 {
- // deal with ipv6 address
- firstHost := hosts[0]
- lastHost := hosts[len(hosts)-1]
- hasPath := strings.Contains(lastHost, "/")
- path := ""
- if hasPath {
- idx := strings.Index(lastHost, "/")
- path = lastHost[idx:]
- }
- firstHost += path
- url, err := url.Parse(firstHost)
- if err != nil {
- return nil, err
- }
- serviceURI, err := fromURL(url)
- if err != nil {
- return nil, err
- }
- var mHosts []string
- var multiHosts []string
- mHosts = append(mHosts, serviceURI.ServiceHosts[0])
- mHosts = append(mHosts, hosts[1:]...)
-
- for _, v := range mHosts {
- h, err :=
validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
- if err == nil {
- multiHosts = append(multiHosts, h)
- } else {
- return nil, err
- }
- }
+func (p *PulsarServiceURI) IsHTTP() bool {
+ return p.ServiceName == HTTPService || p.ServiceName == HTTPSService
+}
- return &PulsarServiceURI{
- serviceURI.ServiceName,
- serviceURI.ServiceInfos,
- multiHosts,
- serviceURI.servicePath,
- serviceURI.URL,
- }, nil
- }
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+ if uriStr == "" {
+ return nil, errors.New("service URI cannot be empty")
}
- url, err := url.Parse(uriStr)
+ // 1. Reduce a multi-host URI to one parseable host while preserving
suffixes.
+ singleHostURI, additionalHosts := splitHostURI(uriStr)
+
+ // 2. Parse single-host URI ONLY
+ u, err := url.Parse(singleHostURI)
if err != nil {
return nil, err
}
- return fromURL(url)
-}
+ if u.Host == "" {
+ return nil, errors.New("service host cannot be empty")
+ }
-func fromURL(url *url.URL) (*PulsarServiceURI, error) {
- if url == nil {
- return nil, errors.New("service url instance is null")
+ // 3. Parse scheme
+ scheme := strings.ToLower(u.Scheme)
+ if scheme == "" {
+ return nil, errors.New("service scheme cannot be empty")
}
- if url.Host == "" || len(url.Host) == 0 {
- return nil, errors.New("service host is null")
+ schemeParts := strings.Split(scheme, "+")
+ serviceName := schemeParts[0]
+ serviceInfos := schemeParts[1:]
+
+ // reject unknown scheme
+ switch serviceName {
+ case BinaryService, HTTPService, HTTPSService:
+ default:
+ return nil, &UnsupportedServiceNameError{ServiceName:
serviceName}
}
- var serviceName string
- var serviceInfos []string
- scheme := url.Scheme
- if scheme != "" {
- scheme = strings.ToLower(scheme)
- schemeParts := strings.Split(scheme, "+")
- serviceName = schemeParts[0]
- serviceInfos = schemeParts[1:]
+ // 4. Validate first host
+ firstHost, err := validateHostName(serviceName, serviceInfos, u.Host)
+ if err != nil {
+ return nil, err
}
- var serviceHosts []string
- hosts := strings.FieldsFunc(url.Host, splitURI)
- for _, v := range hosts {
- h, err := validateHostName(serviceName, serviceInfos, v)
- if err == nil {
- serviceHosts = append(serviceHosts, h)
- } else {
- return nil, err
+ serviceHosts := []string{firstHost}
+
+ // 5. Validate remaining hosts
+ if additionalHosts != "" {
+ for _, h := range strings.FieldsFunc(additionalHosts, splitURI)
{
+ host, err := validateHostName(serviceName,
serviceInfos, h)
+ if err != nil {
+ return nil, err
+ }
+ serviceHosts = append(serviceHosts, host)
}
}
return &PulsarServiceURI{
- serviceName,
- serviceInfos,
- serviceHosts,
- url.Path,
- url,
+ ServiceName: serviceName,
+ ServiceInfos: serviceInfos,
+ ServiceHosts: serviceHosts,
+ servicePath: u.Path,
+ URL: u,
}, nil
}
@@ -165,46 +153,140 @@ func splitURI(r rune) bool {
return r == ',' || r == ';'
}
+func splitHostURI(uriStr string) (string, string) {
+ authorityStart := strings.Index(uriStr, "//")
+ if authorityStart < 0 {
+ return uriStr, ""
+ }
+ authorityStart += 2
+
+ authorityEnd := len(uriStr)
+ if authoritySuffixEnd := strings.IndexAny(uriStr[authorityStart:],
"/?#"); authoritySuffixEnd >= 0 {
+ authorityEnd = authorityStart + authoritySuffixEnd
+ }
+
+ hostListStart := authorityStart
+ if userInfoEnd :=
strings.LastIndex(uriStr[authorityStart:authorityEnd], "@"); userInfoEnd >= 0 {
+ hostListStart += userInfoEnd + 1
+ }
+
+ if firstDelim := strings.IndexFunc(uriStr[hostListStart:authorityEnd],
splitURI); firstDelim >= 0 {
+ firstDelimIdx := hostListStart + firstDelim
+ return uriStr[:firstDelimIdx] + uriStr[authorityEnd:],
uriStr[firstDelimIdx+1 : authorityEnd]
+ }
+
+ return uriStr, ""
+}
+
func validateHostName(serviceName string, serviceInfos []string, hostname
string) (string, error) {
- uri, err := url.Parse("dummyscheme://" + hostname)
+ host, port, err := splitHostPortOrDefault(serviceName, serviceInfos,
hostname)
if err != nil {
return "", err
}
- host := uri.Hostname()
- if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
- host = fmt.Sprintf("[%s]", host)
- }
- if host == "" || uri.Scheme == "" {
- return "", errors.New("Invalid hostname : " + hostname)
+
+ cleanHost, err := normalizeValidatedHost(hostname, host)
+ if err != nil {
+ return "", err
}
- port := uri.Port()
- if uri.Port() == "" {
- p := getServicePort(serviceName, serviceInfos)
- if p == -1 {
- return "", fmt.Errorf("invalid port : %d", p)
+ return net.JoinHostPort(cleanHost, port), nil
+}
+
+func splitHostPortOrDefault(serviceName string, serviceInfos []string,
hostname string) (string, string, error) {
+ // net.SplitHostPort enforces strict host:port syntax:
+ // - IPv4: "127.0.0.1:6650"
+ // - IPv6: "[fec0::1]:6650"
+ //
+ // It will fail for:
+ // - hosts without a port
+ // - bare IPv6 literals without brackets (e.g. "fec0::1")
+ host, port, err := net.SplitHostPort(hostname)
+ if err == nil {
+ if host == "" {
+ // net.SplitHostPort accepts ":port" with an empty
host, but we explicitly
+ // reject such inputs because a non-empty hostname is
required.
+ return "", "", fmt.Errorf("invalid address: host is
empty in %q", hostname)
}
- port = fmt.Sprint(p)
+ return host, port, nil
}
- result := host + ":" + port
- _, _, err = net.SplitHostPort(result)
- if err != nil {
- return "", err
+
+ // If the hostname contains ':' but is not bracketed, it is very likely
+ // an invalid IPv6 literal or an invalid host with too many colons.
+ //
+ // Examples rejected here:
+ // - "fec0::1"
+ // - "fec0::1:6650"
+ // - "localhost:6650:6651"
+ if strings.Contains(hostname, ":") && !strings.HasPrefix(hostname, "[")
{
+ return "", "", fmt.Errorf("invalid address (maybe missing
brackets for IPv6 or too many colons): %s", hostname)
+ }
+
+ defaultPort := getServicePort(serviceName, serviceInfos)
+ if defaultPort == -1 {
+ return "", "", fmt.Errorf("no port found")
+ }
+
+ return hostname, strconv.Itoa(defaultPort), nil
+}
+
+func normalizeValidatedHost(hostname, host string) (string, error) {
+ hasOpeningBracket := strings.HasPrefix(hostname, "[")
+ hasClosingBracket := strings.Contains(hostname, "]")
+ if hasOpeningBracket != hasClosingBracket {
+ return "", fmt.Errorf("invalid bracketed host: %s", hostname)
+ }
+
+ cleanHost := strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
+ if !hasOpeningBracket {
+ return cleanHost, nil
+ }
+
+ addr, err := netip.ParseAddr(cleanHost)
+ if err != nil || !addr.Is6() {
+ return "", fmt.Errorf("invalid IPv6 address: %s", hostname)
}
- return result, nil
+
+ return cleanHost, nil
}
func getServicePort(serviceName string, serviceInfos []string) int {
- switch strings.ToLower(serviceName) {
+ switch serviceName {
case BinaryService:
+ // For Pulsar, only the "ssl" modifier is allowed. Any other
non-empty
+ // modifier is treated as invalid and causes port resolution to
fail.
if len(serviceInfos) == 0 {
return BinaryPort
- } else if len(serviceInfos) == 1 &&
strings.ToLower(serviceInfos[0]) == SSLService {
+ }
+
+ hasSSL := false
+ for _, info := range serviceInfos {
+ if info == "" {
+ // Ignore empty modifiers if present.
+ continue
+ }
+ if strings.EqualFold(info, SSLService) {
+ hasSSL = true
+ continue
+ }
+ // Unknown modifier: reject to avoid silently accepting
typos.
+ return -1
+ }
+
+ if hasSSL {
return BinaryTLSPort
}
+ return BinaryPort
case HTTPService:
+ // HTTP should not have any scheme modifiers; reject if present.
+ if len(serviceInfos) != 0 {
+ return -1
+ }
return HTTPPort
case HTTPSService:
+ // HTTPS should not have any scheme modifiers; reject if
present.
+ if len(serviceInfos) != 0 {
+ return -1
+ }
return HTTPSPort
}
return -1
diff --git a/pulsar/internal/service_uri_test.go
b/pulsar/internal/service_uri_test.go
index 445b3254..1c6c1dbb 100644
--- a/pulsar/internal/service_uri_test.go
+++ b/pulsar/internal/service_uri_test.go
@@ -18,9 +18,11 @@
package internal
import (
+ "errors"
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestInvalidServiceUris(t *testing.T) {
@@ -31,6 +33,8 @@ func TestInvalidServiceUris(t *testing.T) {
"pulsar://localhost:xyz/", // invalid port
"pulsar://localhost:-6650/", // negative port
"pulsar://fec0:0:0:ffff::1:6650", // missing brackets
+ "pulsar://[example]:6650", // invalid hostname
+ "pulsar://fec0::1", // missing brackets
}
for _, uri := range uris {
@@ -44,15 +48,41 @@ func TestEmptyServiceUriString(t *testing.T) {
assert.NotNil(t, err)
}
-func TestNullServiceUrlInstance(t *testing.T) {
- u, err := NewPulsarServiceURIFromURL(nil)
- assert.Nil(t, u)
- assert.NotNil(t, err)
-}
-
func TestMissingServiceName(t *testing.T) {
serviceURI := "//localhost:6650/path/to/namespace"
- assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"},
"/path/to/namespace", "")
+ _, err := NewPulsarServiceURIFromURIString(serviceURI)
+ require.Error(t, err)
+}
+
+func TestUnsupportedServiceNameError(t *testing.T) {
+ _, err := NewPulsarServiceURIFromURIString("ftp://localhost:21")
+ require.Error(t, err)
+
+ var unsupportedServiceNameErr *UnsupportedServiceNameError
+ require.ErrorAs(t, err, &unsupportedServiceNameErr)
+ assert.Equal(t, "ftp", unsupportedServiceNameErr.ServiceName)
+ assert.True(t, errors.As(err, &unsupportedServiceNameErr))
+}
+
+func TestIsHTTP(t *testing.T) {
+ testCases := []struct {
+ name string
+ uri string
+ expected bool
+ }{
+ {name: "pulsar", uri: "pulsar://localhost:6650", expected:
false},
+ {name: "pulsar ssl", uri: "pulsar+ssl://localhost:6651",
expected: false},
+ {name: "http", uri: "http://localhost", expected: true},
+ {name: "https", uri: "https://localhost", expected: true},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ serviceURI, err :=
NewPulsarServiceURIFromURIString(tc.uri)
+ require.NoError(t, err)
+ assert.Equal(t, tc.expected, serviceURI.IsHTTP())
+ })
+ }
}
func TestEmptyPath(t *testing.T) {
@@ -143,6 +173,21 @@ func TestMultipleHostsMixed(t *testing.T) {
"/path/to/namespace", "")
}
+func TestPathQueryAndFragmentDelimitersDoNotSplitHosts(t *testing.T) {
+ serviceURI :=
"pulsar://host1:6650/path,with;delimiters?param=a,b;c#frag,ment;tail"
+ uri, err := NewPulsarServiceURIFromURIString(serviceURI)
+ require.NoError(t, err)
+ require.NotNil(t, uri)
+ assert.Equal(t, []string{"host1:6650"}, uri.ServiceHosts)
+ assert.Equal(t, "/path,with;delimiters", uri.servicePath)
+ assert.Equal(t, "param=a,b;c", uri.URL.RawQuery)
+ assert.Equal(t, "frag,ment;tail", uri.URL.Fragment)
+}
+
+func TestInvalidBracketedAdditionalHost(t *testing.T) {
+ testInvalidServiceURI(t,
"pulsar://host1:6650,[example]:6650/path/to/namespace")
+}
+
func TestUserInfoWithMultipleHosts(t *testing.T) {
serviceURI :=
"pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650",
"host2:6650", "host3:6650"},