This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d0db3403 improve: service URL parsing compatible with Go 1.26 (#1468)
d0db3403 is described below

commit d0db3403023d3390392db8562741691482af6df4
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Mar 21 12:35:00 2026 +0800

    improve: service URL parsing compatible with Go 1.26 (#1468)
    
    * improve: service URL parsing compatible with Go 1.26
    
    * ci: add Go 1.26 support
    
    * refactor: remove ResolveHostURI method and update tests for service URL 
resolution
    
    * improve: enhance service URL parsing and add tests for unsupported schemes
    
    * Update pulsar/internal/service_uri.go
    
    Co-authored-by: Copilot <[email protected]>
    
    * improve: simplify service URL parsing by removing unnecessary URI 
conversion
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: crossoverJie <[email protected]>
---
 .github/workflows/ci.yml                      |   8 +-
 Dockerfile                                    |   2 +-
 Makefile                                      |   2 +-
 README.md                                     |   4 +-
 pulsar/client_impl.go                         |  19 +-
 pulsar/client_impl_test.go                    |   6 +
 pulsar/internal/http_client.go                |   4 +-
 pulsar/internal/lookup_service.go             |   8 +-
 pulsar/internal/lookup_service_test.go        |  86 ++++----
 pulsar/internal/rpc_client.go                 |  34 ++-
 pulsar/internal/rpc_client_test.go            |  10 +-
 pulsar/internal/service_name_resolver.go      |  36 +---
 pulsar/internal/service_name_resolver_test.go |  82 ++++----
 pulsar/internal/service_uri.go                | 286 +++++++++++++++++---------
 pulsar/internal/service_uri_test.go           |  59 +++++-
 15 files changed, 368 insertions(+), 278 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index f6634eaf..30c18fe3 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -22,7 +22,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        go-version: [ '1.23', '1.24' ]
+        go-version: [ '1.24', '1.26']
     steps:
       - uses: actions/checkout@v3
       - uses: actions/setup-go@v5
@@ -36,7 +36,7 @@ jobs:
       - uses: actions/checkout@v3
       - uses: actions/setup-go@v5
         with:
-          go-version: '1.23'
+          go-version: '1.24'
       - name: Check license header
         run: docker run --rm -v $(pwd):/github/workspace 
ghcr.io/korandoru/hawkeye-native:v3 check
       - name: Run golangci-lint
@@ -48,7 +48,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        go-version: [ '1.23', '1.24' ]
+        go-version: [ '1.24', '1.26' ]
     steps:
       - uses: actions/checkout@v3      
       - uses: actions/setup-go@v3
@@ -57,6 +57,8 @@ jobs:
       - name: Run Tests
         id: run-tests
         run: make test GO_VERSION=${{ matrix.go-version }}
+        env:
+          GODEBUG: urlstrictcolons=1
       - name: Upload logs on failure
         if: failure() && steps.run-tests.outcome == 'failure'
         uses: actions/upload-artifact@v4
diff --git a/Dockerfile b/Dockerfile
index 8716f737..44a10042 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,7 +19,7 @@
 # set via the Makefile or CLI
 ARG PULSAR_IMAGE=apachepulsar/pulsar:latest
 
-ARG GO_VERSION=1.23
+ARG GO_VERSION=1.24
 FROM golang:$GO_VERSION as golang
 
 FROM $PULSAR_IMAGE
diff --git a/Makefile b/Makefile
index bee9902a..7cdf21fd 100644
--- a/Makefile
+++ b/Makefile
@@ -20,7 +20,7 @@
 IMAGE_NAME = pulsar-client-go-test:latest
 PULSAR_VERSION ?= latest
 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
-GO_VERSION ?= 1.23
+GO_VERSION ?= 1.24
 CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/)
 
 # Golang standard bin directory.
diff --git a/README.md b/README.md
index c5e6d1c1..a3000b29 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@ CGo-based library.
 
 ## Requirements
 
-- Go 1.23+
+- Go 1.24+
 
 ## Status
 
@@ -148,7 +148,7 @@ Run the tests:
 
 Run the tests with specific versions of GOLANG and PULSAR:
 
-    make test GO_VERSION=1.23 PULSAR_VERSION=4.0.3
+    make test GO_VERSION=1.24 PULSAR_VERSION=4.0.3
 
 ## Contributing
 
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index d940367f..f1cfeb04 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "fmt"
-       "net/url"
        "sync"
        "time"
 
@@ -80,31 +79,31 @@ func newClient(options ClientOptions) (Client, error) {
                return nil, newError(InvalidConfiguration, "URL is required for 
client")
        }
 
-       url, err := url.Parse(options.URL)
+       pulsarServiceURI, err := 
internal.NewPulsarServiceURIFromURIString(options.URL)
        if err != nil {
                logger.WithError(err).Error("Failed to parse service URL")
                return nil, newError(InvalidConfiguration, "Invalid service 
URL")
        }
 
        var tlsConfig *internal.TLSOptions
-       switch url.Scheme {
-       case "pulsar", "http":
-               tlsConfig = nil
-       case "pulsar+ssl", "https":
+       if pulsarServiceURI.UseTLS() {
+               hostName, err := pulsarServiceURI.PrimaryHostName()
+               if err != nil {
+                       return nil, err
+               }
+
                tlsConfig = &internal.TLSOptions{
                        AllowInsecureConnection: 
options.TLSAllowInsecureConnection,
                        KeyFile:                 options.TLSKeyFilePath,
                        CertFile:                options.TLSCertificateFile,
                        TrustCertsFilePath:      options.TLSTrustCertsFilePath,
                        ValidateHostname:        options.TLSValidateHostname,
-                       ServerName:              url.Hostname(),
+                       ServerName:              hostName,
                        CipherSuites:            options.TLSCipherSuites,
                        MinVersion:              options.TLSMinVersion,
                        MaxVersion:              options.TLSMaxVersion,
                        TLSConfig:               options.TLSConfig,
                }
-       default:
-               return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid 
URL scheme '%s'", url.Scheme))
        }
 
        var authProvider auth.Provider
@@ -175,7 +174,7 @@ func newClient(options ClientOptions) (Client, error) {
                tlsEnabled:       tlsConfig != nil,
        }
 
-       c.rpcClient, err = internal.NewRPCClient(url, c.cnxPool, 
operationTimeout, logger, metrics,
+       c.rpcClient, err = internal.NewRPCClient(options.URL, c.cnxPool, 
operationTimeout, logger, metrics,
                options.ListenerName, tlsConfig, authProvider, 
toKeyValues(options.LookupProperties))
        if err != nil {
                return nil, err
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 7a8c8e60..30754044 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -46,6 +46,12 @@ func TestClient(t *testing.T) {
        assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
 }
 
+func TestClientInvalidScheme(t *testing.T) {
+       client, err := NewClient(ClientOptions{URL: "ftp://localhost:21"})
+       require.Error(t, err)
+       assert.Nil(t, client)
+}
+
 func TestTLSConnectionCAError(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL:              serviceURLTLS,
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index b7dea1fe..38912206 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -59,13 +59,13 @@ type HTTPClient interface {
        Closable
 }
 
-func NewHTTPClient(serviceURL *url.URL, serviceNameResolver 
ServiceNameResolver, tlsConfig *TLSOptions,
+func NewHTTPClient(serviceNameResolver ServiceNameResolver, tlsConfig 
*TLSOptions,
        requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
        authProvider auth.Provider) (HTTPClient, error) {
        h := &httpClient{
                ServiceNameResolver: serviceNameResolver,
                requestTimeout:      requestTimeout,
-               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
+               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceNameResolver.GetServiceURI().URL}),
                metrics:             metrics,
        }
        c := &http.Client{Timeout: requestTimeout}
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 19e51b41..876a1021 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -92,14 +92,14 @@ type lookupService struct {
 }
 
 // NewLookupService init a lookup service struct and return an object of 
LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, 
serviceNameResolver ServiceNameResolver,
+func NewLookupService(rpcClient RPCClient, serviceNameResolver 
ServiceNameResolver,
        tlsEnabled bool, listenerName string,
        lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics) 
LookupService {
        return &lookupService{
                rpcClient:           rpcClient,
                serviceNameResolver: serviceNameResolver,
                tlsEnabled:          tlsEnabled,
-               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
+               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceNameResolver.GetServiceURI().URL}),
                lookupProperties:    lookupProperties,
                metrics:             metrics,
                listenerName:        listenerName,
@@ -446,14 +446,14 @@ func (h *httpLookupService) Close() {
 }
 
 // NewHTTPLookupService init a http based lookup service struct and return an 
object of LookupService.
-func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, 
serviceNameResolver ServiceNameResolver,
+func NewHTTPLookupService(httpClient HTTPClient, serviceNameResolver 
ServiceNameResolver,
        tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
 
        return &httpLookupService{
                httpClient:          httpClient,
                serviceNameResolver: serviceNameResolver,
                tlsEnabled:          tlsEnabled,
-               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
+               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceNameResolver.GetServiceURI().URL}),
                metrics:             metrics,
        }
 }
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index b8467137..e12eec4b 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/pkg/errors"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/proto"
 
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
@@ -131,9 +132,8 @@ func responseType(r 
pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT
 }
 
 func TestLookupSuccess(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -157,7 +157,7 @@ func TestLookupSuccess(t *testing.T) {
                },
        }
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, serviceNameResolver, false, 
"", kvs, log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, false, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -168,9 +168,8 @@ func TestLookupSuccess(t *testing.T) {
 }
 
 func TestTlsLookupSuccess(t *testing.T) {
-       url, err := url.Parse("pulsar+ssl://example:6651")
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -195,7 +194,7 @@ func TestTlsLookupSuccess(t *testing.T) {
        }
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
 
-       ls := NewLookupService(mockedClient, url, serviceNameResolver, true, 
"", kvs, log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, true, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -206,9 +205,8 @@ func TestTlsLookupSuccess(t *testing.T) {
 }
 
 func TestLookupWithProxy(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -233,7 +231,7 @@ func TestLookupWithProxy(t *testing.T) {
                },
        }
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, serviceNameResolver, false, 
"", kvs, log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, false, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -244,8 +242,8 @@ func TestLookupWithProxy(t *testing.T) {
 }
 
 func TestTlsLookupWithProxy(t *testing.T) {
-       url, err := url.Parse("pulsar+ssl://example:6651")
-       assert.NoError(t, err)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -269,9 +267,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
                        },
                },
        }
-       resolver := NewPulsarServiceNameResolver(url)
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, 
log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, true, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -282,8 +279,8 @@ func TestTlsLookupWithProxy(t *testing.T) {
 }
 
 func TestLookupWithRedirect(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t:           t,
@@ -319,9 +316,8 @@ func TestLookupWithRedirect(t *testing.T) {
                        },
                },
        }
-       resolver := NewPulsarServiceNameResolver(url)
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, 
log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, false, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -332,8 +328,8 @@ func TestLookupWithRedirect(t *testing.T) {
 }
 
 func TestTlsLookupWithRedirect(t *testing.T) {
-       url, err := url.Parse("pulsar+ssl://example:6651")
-       assert.NoError(t, err)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar+ssl://example:6651")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t:           t,
@@ -370,9 +366,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
                },
        }
 
-       resolver := NewPulsarServiceNameResolver(url)
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, 
log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, true, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -383,8 +378,8 @@ func TestTlsLookupWithRedirect(t *testing.T) {
 }
 
 func TestLookupWithInvalidUrlResponse(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -408,9 +403,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
                        },
                },
        }
-       resolver := NewPulsarServiceNameResolver(url)
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, 
log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, false, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -418,8 +412,8 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
 }
 
 func TestLookupWithLookupFailure(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        mockedClient := &mockedLookupRPCClient{
                t: t,
@@ -442,9 +436,8 @@ func TestLookupWithLookupFailure(t *testing.T) {
                },
        }
 
-       resolver := NewPulsarServiceNameResolver(url)
        metricsProvider := NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer)
-       ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, 
log.DefaultNopLogger(), metricsProvider)
+       ls := NewLookupService(mockedClient, serviceNameResolver, false, "", 
kvs, log.DefaultNopLogger(), metricsProvider)
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -526,9 +519,8 @@ func (m *mockedPartitionedTopicMetadataRPCClient) 
LookupService(_ string) (Looku
 }
 
 func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
-       url, err := url.Parse("pulsar://example:6650")
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://example:6650")
+       require.NoError(t, err)
 
        kvs := make([]*pb.KeyValue, 0)
        ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
@@ -547,7 +539,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
                                Response:   
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
                        },
                },
-       }, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
+       }, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
                NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer))
 
        metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
@@ -557,10 +549,8 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
 }
 
 func TestLookupSuccessWithMultipleHosts(t *testing.T) {
-       url, err := url.Parse("pulsar://host1,host2,host3:6650")
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
-
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("pulsar://host1:6650,host2:6650,host3:6650")
+       require.NoError(t, err)
        kvs := make([]*pb.KeyValue, 0)
        ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
@@ -582,7 +572,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
+       }, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
                NewMetricsProvider(4, map[string]string{}, 
prometheus.DefaultRegisterer))
 
        lr, err := ls.Lookup("my-topic")
@@ -639,11 +629,10 @@ func NewMockHTTPClient(serviceNameResolver 
ServiceNameResolver) HTTPClient {
 }
 
 func TestHttpLookupSuccess(t *testing.T) {
-       url, err := url.Parse("http://broker-1:8080";)
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("http://broker-1:8080";)
+       require.NoError(t, err)
        httpClient := NewMockHTTPClient(serviceNameResolver)
-       ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+       ls := NewHTTPLookupService(httpClient, serviceNameResolver, false,
                log.DefaultNopLogger(), NewMetricsProvider(4, 
map[string]string{}, prometheus.DefaultRegisterer))
 
        lr, err := ls.Lookup("my-topic")
@@ -655,11 +644,10 @@ func TestHttpLookupSuccess(t *testing.T) {
 }
 
 func TestHttpGetPartitionedTopicMetadataSuccess(t *testing.T) {
-       url, err := url.Parse("http://broker-1:8080";)
-       assert.NoError(t, err)
-       serviceNameResolver := NewPulsarServiceNameResolver(url)
+       serviceNameResolver, err := 
NewPulsarServiceNameResolver("http://broker-1:8080";)
+       require.NoError(t, err)
        httpClient := NewMockHTTPClient(serviceNameResolver)
-       ls := NewHTTPLookupService(httpClient, url, serviceNameResolver, false,
+       ls := NewHTTPLookupService(httpClient, serviceNameResolver, false,
                log.DefaultNopLogger(), NewMetricsProvider(4, 
map[string]string{}, prometheus.DefaultRegisterer))
 
        tMetadata, err := ls.GetPartitionedTopicMetadata("my-topic")
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 68cf1347..85e048ef 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -94,7 +94,7 @@ type rpcClient struct {
        lookupProperties        []*pb.KeyValue
 }
 
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+func NewRPCClient(serviceURL string, pool ConnectionPool,
        requestTimeout time.Duration, logger log.Logger, metrics *Metrics,
        listenerName string, tlsConfig *TLSOptions, authProvider auth.Provider,
        lookupProperties []*pb.KeyValue) (RPCClient, error) {
@@ -109,7 +109,7 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
                urlLookupServiceMap: make(map[string]LookupService),
                lookupProperties:    lookupProperties,
        }
-       lookupService, err := c.NewLookupService(serviceURL)
+       lookupService, err := c.LookupService(serviceURL)
        if err != nil {
                return nil, fmt.Errorf("failed to create lookup service: %w", 
err)
        }
@@ -232,12 +232,7 @@ func (c *rpcClient) LookupService(URL string) 
(LookupService, error) {
                return lookupService, nil
        }
 
-       serviceURL, err := url.Parse(URL)
-       if err != nil {
-               return nil, fmt.Errorf("failed to parse URL '%s': %w", URL, err)
-       }
-
-       lookupService, err = c.NewLookupService(serviceURL)
+       lookupService, err := c.newLookupService(URL)
        if err != nil {
                return nil, fmt.Errorf("failed to create lookup service for URL 
'%s': %w", URL, err)
        }
@@ -245,26 +240,25 @@ func (c *rpcClient) LookupService(URL string) 
(LookupService, error) {
        return lookupService, nil
 }
 
-func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
+func (c *rpcClient) newLookupService(serviceURL string) (LookupService, error) 
{
+       serviceNameResolver, err := NewPulsarServiceNameResolver(serviceURL)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create service name resolver 
for URL '%s': %w", serviceURL, err)
+       }
 
-       switch url.Scheme {
-       case "pulsar", "pulsar+ssl":
-               serviceNameResolver := NewPulsarServiceNameResolver(url)
-               return NewLookupService(c, url, serviceNameResolver,
-                       c.tlsConfig != nil, c.listenerName, c.lookupProperties, 
c.log, c.metrics), nil
-       case "http", "https":
-               serviceNameResolver := NewPulsarServiceNameResolver(url)
-               httpClient, err := NewHTTPClient(url, serviceNameResolver, 
c.tlsConfig,
+       if serviceNameResolver.GetServiceURI().IsHTTP() {
+               httpClient, err := NewHTTPClient(serviceNameResolver, 
c.tlsConfig,
                        c.requestTimeout, c.log, c.metrics, c.authProvider)
                if err != nil {
                        return nil, err
                }
 
                return NewHTTPLookupService(
-                       httpClient, url, serviceNameResolver, c.tlsConfig != 
nil, c.log, c.metrics), nil
-       default:
-               return nil, fmt.Errorf("invalid URL scheme '%s'", url.Scheme)
+                       httpClient, serviceNameResolver, c.tlsConfig != nil, 
c.log, c.metrics), nil
        }
+
+       return NewLookupService(c, serviceNameResolver,
+               c.tlsConfig != nil, c.listenerName, c.lookupProperties, c.log, 
c.metrics), nil
 }
 
 func (c *rpcClient) Close() {
diff --git a/pulsar/internal/rpc_client_test.go 
b/pulsar/internal/rpc_client_test.go
index 08a2013e..e9623d5a 100644
--- a/pulsar/internal/rpc_client_test.go
+++ b/pulsar/internal/rpc_client_test.go
@@ -18,7 +18,6 @@
 package internal
 
 import (
-       "net/url"
        "testing"
 
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -27,34 +26,31 @@ import (
 
 func TestNewRPCClient_InvalidURL_ShouldNotPanic(t *testing.T) {
        // Test that NewRPCClient doesn't panic with invalid URL
-       invalidURL, _ := url.Parse("invalid://scheme")
+       invalidURL := "invalid://scheme"
 
        // This should not panic and should return an error
        _, err := NewRPCClient(invalidURL, nil, 0, log.DefaultNopLogger(), nil, 
"", nil, nil, nil)
        assert.Error(t, err)
-       assert.Contains(t, err.Error(), "invalid URL scheme")
 }
 
 func TestLookupService_InvalidURL_ShouldNotPanic(t *testing.T) {
        // Create a minimal RPC client for testing
-       validURL, _ := url.Parse("pulsar://localhost:6650")
+       validURL := "pulsar://localhost:6650"
        rpcClient, err := NewRPCClient(validURL, nil, 0, 
log.DefaultNopLogger(), nil, "", nil, nil, nil)
        assert.NoError(t, err)
 
        // Test that LookupService doesn't panic with invalid URL
        _, err = rpcClient.LookupService("invalid://url")
        assert.Error(t, err)
-       assert.Contains(t, err.Error(), "invalid URL scheme")
 }
 
 func TestLookupService_InvalidScheme_ShouldNotPanic(t *testing.T) {
        // Create a minimal RPC client for testing
-       validURL, _ := url.Parse("pulsar://localhost:6650")
+       validURL := "pulsar://localhost:6650"
        rpcClient, err := NewRPCClient(validURL, nil, 0, 
log.DefaultNopLogger(), nil, "", nil, nil, nil)
        assert.NoError(t, err)
 
        // Test that LookupService doesn't panic with invalid scheme
        _, err = rpcClient.LookupService("ftp://localhost:21";)
        assert.Error(t, err)
-       assert.Contains(t, err.Error(), "invalid URL scheme")
 }
diff --git a/pulsar/internal/service_name_resolver.go 
b/pulsar/internal/service_name_resolver.go
index a8fabb22..29a57edc 100644
--- a/pulsar/internal/service_name_resolver.go
+++ b/pulsar/internal/service_name_resolver.go
@@ -30,16 +30,13 @@ import (
 
 type ServiceNameResolver interface {
        ResolveHost() (*url.URL, error)
-       ResolveHostURI() (*PulsarServiceURI, error)
-       UpdateServiceURL(url *url.URL) error
+       UpdateServiceURL(serviceURL string) error
        GetServiceURI() *PulsarServiceURI
-       GetServiceURL() *url.URL
        GetAddressList() []*url.URL
 }
 
 type pulsarServiceNameResolver struct {
        ServiceURI   *PulsarServiceURI
-       ServiceURL   *url.URL
        CurrentIndex int32
        AddressList  []*url.URL
 
@@ -47,13 +44,12 @@ type pulsarServiceNameResolver struct {
        mutex sync.Mutex
 }
 
-func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
+func NewPulsarServiceNameResolver(serviceURL string) (ServiceNameResolver, 
error) {
        r := &pulsarServiceNameResolver{rnd: 
rand.New(rand.NewSource(time.Now().UnixNano()))}
-       err := r.UpdateServiceURL(url)
-       if err != nil {
-               log.Errorf("create pulsar service name resolver failed : %v", 
err)
+       if len(serviceURL) > 0 {
+               return r, r.UpdateServiceURL(serviceURL)
        }
-       return r
+       return r, nil
 }
 
 func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
@@ -64,7 +60,7 @@ func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, 
error) {
                return nil, errors.New("no service url is provided yet")
        }
        if len(r.AddressList) == 0 {
-               return nil, fmt.Errorf("no hosts found for service url : %v", 
r.ServiceURL)
+               return nil, fmt.Errorf("no hosts found for service url : %v", 
r.ServiceURI)
        }
        if len(r.AddressList) == 1 {
                return r.AddressList[0], nil
@@ -74,19 +70,10 @@ func (r *pulsarServiceNameResolver) ResolveHost() 
(*url.URL, error) {
        return r.AddressList[idx], nil
 }
 
-func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, 
error) {
-       host, err := r.ResolveHost()
+func (r *pulsarServiceNameResolver) UpdateServiceURL(serviceURL string) error {
+       uri, err := NewPulsarServiceURIFromURIString(serviceURL)
        if err != nil {
-               return nil, err
-       }
-       hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port()
-       return NewPulsarServiceURIFromURIString(hostURL)
-}
-
-func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error {
-       uri, err := NewPulsarServiceURIFromURL(u)
-       if err != nil {
-               log.Errorf("invalid service-url instance %s provided %v", u, 
err)
+               log.Errorf("invalid service-url instance %s provided %v", 
serviceURL, err)
                return err
        }
 
@@ -106,7 +93,6 @@ func (r *pulsarServiceNameResolver) UpdateServiceURL(u 
*url.URL) error {
        defer r.mutex.Unlock()
 
        r.AddressList = addresses
-       r.ServiceURL = u
        r.ServiceURI = uri
        r.CurrentIndex = int32(r.rnd.Intn(len(addresses)))
        return nil
@@ -116,10 +102,6 @@ func (r *pulsarServiceNameResolver) GetServiceURI() 
*PulsarServiceURI {
        return r.ServiceURI
 }
 
-func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL {
-       return r.ServiceURL
-}
-
 func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL {
        return r.AddressList
 }
diff --git a/pulsar/internal/service_name_resolver_test.go 
b/pulsar/internal/service_name_resolver_test.go
index e1906339..51061c6a 100644
--- a/pulsar/internal/service_name_resolver_test.go
+++ b/pulsar/internal/service_name_resolver_test.go
@@ -22,40 +22,33 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
        u, err := resolver.ResolveHost()
        assert.Nil(t, u)
        assert.NotNil(t, err)
        assert.EqualError(t, err, "no service url is provided yet")
 }
 
-func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
-       u, err := resolver.ResolveHostURI()
-       assert.Nil(t, u)
-       assert.NotNil(t, err)
-       assert.EqualError(t, err, "no service url is provided yet")
-}
-
 func TestUpdateInvalidServiceUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
-       url, _ := url.Parse("pulsar:///")
-       err := resolver.UpdateServiceURL(url)
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
+       err = resolver.UpdateServiceURL("pulsar:///")
        assert.NotNil(t, err)
-       assert.Empty(t, resolver.GetServiceURL())
        assert.Nil(t, resolver.GetServiceURI())
 }
 
 func TestSimpleHostUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
-       serviceURL, _ := url.Parse("pulsar://host1:6650")
-       err := resolver.UpdateServiceURL(serviceURL)
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
+       serviceURL := "pulsar://host1:6650"
+       err = resolver.UpdateServiceURL(serviceURL)
        assert.Nil(t, err)
-       assert.Equal(t, serviceURL, resolver.GetServiceURL())
-       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
        assert.Nil(t, err)
        assert.Equal(t, expectedURI, resolver.GetServiceURI())
        actualHost, err := resolver.ResolveHost()
@@ -63,11 +56,10 @@ func TestSimpleHostUrl(t *testing.T) {
        assert.Equal(t, "host1", actualHost.Hostname())
        assert.Equal(t, "6650", actualHost.Port())
 
-       newServiceURL, _ := url.Parse("pulsar://host2:6650")
+       newServiceURL := "pulsar://host2:6650"
        err = resolver.UpdateServiceURL(newServiceURL)
        assert.Nil(t, err)
-       assert.Equal(t, newServiceURL, resolver.GetServiceURL())
-       expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL)
+       expectedURI, err = NewPulsarServiceURIFromURIString(newServiceURL)
        assert.Nil(t, err)
        assert.Equal(t, expectedURI, resolver.GetServiceURI())
        actualHost, err = resolver.ResolveHost()
@@ -77,55 +69,59 @@ func TestSimpleHostUrl(t *testing.T) {
 }
 
 func TestMultipleHostsUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
-       serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650")
-       err := resolver.UpdateServiceURL(serviceURL)
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
+       serviceURL := "pulsar://host1:6650,host2:6650"
+       err = resolver.UpdateServiceURL(serviceURL)
        assert.Nil(t, err)
-       assert.Equal(t, serviceURL, resolver.GetServiceURL())
-       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
        assert.Nil(t, err)
        assert.Equal(t, expectedURI, resolver.GetServiceURI())
        host1, _ := url.Parse("pulsar://host1:6650")
        host2, _ := url.Parse("pulsar://host2:6650")
-       host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650")
-       host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650")
        assert.Contains(t, resolver.GetAddressList(), host1)
        assert.Contains(t, resolver.GetAddressList(), host2)
        hosts := []*url.URL{host1, host2}
-       hosturis := []*PulsarServiceURI{host1uri, host2uri}
        for i := 0; i < 10; i++ {
                host, err := resolver.ResolveHost()
                assert.Nil(t, err)
-               hosturi, err := resolver.ResolveHostURI()
-               assert.Nil(t, err)
                assert.Contains(t, hosts, host)
-               assert.Contains(t, hosturis, hosturi)
        }
 }
 
 func TestMultipleHostsTlsUrl(t *testing.T) {
-       resolver := NewPulsarServiceNameResolver(nil)
-       serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651")
-       err := resolver.UpdateServiceURL(serviceURL)
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
+       serviceURL := "pulsar+ssl://host1:6651,host2:6651"
+       err = resolver.UpdateServiceURL(serviceURL)
        assert.Nil(t, err)
-       assert.Equal(t, serviceURL, resolver.GetServiceURL())
-       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       expectedURI, err := NewPulsarServiceURIFromURIString(serviceURL)
        assert.Nil(t, err)
        assert.Equal(t, expectedURI, resolver.GetServiceURI())
        host1, _ := url.Parse("pulsar+ssl://host1:6651")
        host2, _ := url.Parse("pulsar+ssl://host2:6651")
-       host1uri, _ := 
NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651")
-       host2uri, _ := 
NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651")
        assert.Contains(t, resolver.GetAddressList(), host1)
        assert.Contains(t, resolver.GetAddressList(), host2)
        hosts := []*url.URL{host1, host2}
-       hosturis := []*PulsarServiceURI{host1uri, host2uri}
        for i := 0; i < 10; i++ {
                host, err := resolver.ResolveHost()
                assert.Nil(t, err)
-               hosturi, err := resolver.ResolveHostURI()
-               assert.Nil(t, err)
                assert.Contains(t, hosts, host)
-               assert.Contains(t, hosturis, hosturi)
        }
 }
+
+func TestResolveIpv6Host(t *testing.T) {
+       resolver, err := NewPulsarServiceNameResolver("")
+       require.NoError(t, err)
+
+       serviceURL := "pulsar://[fec0:0:0:ffff::1]:6650"
+       err = resolver.UpdateServiceURL(serviceURL)
+       require.NoError(t, err)
+
+       actualHost, err := resolver.ResolveHost()
+       require.NoError(t, err)
+       assert.Equal(t, "pulsar", actualHost.Scheme)
+       assert.Equal(t, "fec0:0:0:ffff::1", actualHost.Hostname())
+       assert.Equal(t, "6650", actualHost.Port())
+       assert.Equal(t, "[fec0:0:0:ffff::1]:6650", actualHost.Host)
+}
diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go
index 7b753398..eccd22ad 100644
--- a/pulsar/internal/service_uri.go
+++ b/pulsar/internal/service_uri.go
@@ -21,7 +21,10 @@ import (
        "errors"
        "fmt"
        "net"
+       "net/netip"
        "net/url"
+       "slices"
+       "strconv"
        "strings"
 
        log "github.com/sirupsen/logrus"
@@ -46,6 +49,14 @@ type PulsarServiceURI struct {
        URL          *url.URL
 }
 
+type UnsupportedServiceNameError struct {
+       ServiceName string
+}
+
+func (e *UnsupportedServiceNameError) Error() string {
+       return fmt.Sprintf("unsupported service name: %s", e.ServiceName)
+}
+
 func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
        u, err := fromString(uri)
        if err != nil {
@@ -55,109 +66,86 @@ func NewPulsarServiceURIFromURIString(uri string) 
(*PulsarServiceURI, error) {
        return u, nil
 }
 
-func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
-       u, err := fromURL(url)
-       if err != nil {
-               log.Error(err)
-               return nil, err
+func (p *PulsarServiceURI) UseTLS() bool {
+       return p.ServiceName == HTTPSService || slices.Contains(p.ServiceInfos, 
SSLService)
+}
+
+func (p *PulsarServiceURI) PrimaryHostName() (string, error) {
+       if len(p.ServiceHosts) > 0 {
+               host, _, err := net.SplitHostPort(p.ServiceHosts[0])
+               if err != nil {
+                       return "", err
+               }
+               return host, nil
        }
-       return u, nil
+
+       return "", errors.New("no hosts available in ServiceHosts")
 }
 
-func fromString(uriStr string) (*PulsarServiceURI, error) {
-       if uriStr == "" || len(uriStr) == 0 {
-               return nil, errors.New("service uriStr string is null")
-       }
-       if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
-               // deal with ipv6 address
-               hosts := strings.FieldsFunc(uriStr, splitURI)
-               if len(hosts) > 1 {
-                       // deal with ipv6 address
-                       firstHost := hosts[0]
-                       lastHost := hosts[len(hosts)-1]
-                       hasPath := strings.Contains(lastHost, "/")
-                       path := ""
-                       if hasPath {
-                               idx := strings.Index(lastHost, "/")
-                               path = lastHost[idx:]
-                       }
-                       firstHost += path
-                       url, err := url.Parse(firstHost)
-                       if err != nil {
-                               return nil, err
-                       }
-                       serviceURI, err := fromURL(url)
-                       if err != nil {
-                               return nil, err
-                       }
-                       var mHosts []string
-                       var multiHosts []string
-                       mHosts = append(mHosts, serviceURI.ServiceHosts[0])
-                       mHosts = append(mHosts, hosts[1:]...)
-
-                       for _, v := range mHosts {
-                               h, err := 
validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
-                               if err == nil {
-                                       multiHosts = append(multiHosts, h)
-                               } else {
-                                       return nil, err
-                               }
-                       }
+func (p *PulsarServiceURI) IsHTTP() bool {
+       return p.ServiceName == HTTPService || p.ServiceName == HTTPSService
+}
 
-                       return &PulsarServiceURI{
-                               serviceURI.ServiceName,
-                               serviceURI.ServiceInfos,
-                               multiHosts,
-                               serviceURI.servicePath,
-                               serviceURI.URL,
-                       }, nil
-               }
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+       if uriStr == "" {
+               return nil, errors.New("service URI cannot be empty")
        }
 
-       url, err := url.Parse(uriStr)
+       // 1. Reduce a multi-host URI to one parseable host while preserving 
suffixes.
+       singleHostURI, additionalHosts := splitHostURI(uriStr)
+
+       // 2. Parse single-host URI ONLY
+       u, err := url.Parse(singleHostURI)
        if err != nil {
                return nil, err
        }
 
-       return fromURL(url)
-}
+       if u.Host == "" {
+               return nil, errors.New("service host cannot be empty")
+       }
 
-func fromURL(url *url.URL) (*PulsarServiceURI, error) {
-       if url == nil {
-               return nil, errors.New("service url instance is null")
+       // 3. Parse scheme
+       scheme := strings.ToLower(u.Scheme)
+       if scheme == "" {
+               return nil, errors.New("service scheme cannot be empty")
        }
 
-       if url.Host == "" || len(url.Host) == 0 {
-               return nil, errors.New("service host is null")
+       schemeParts := strings.Split(scheme, "+")
+       serviceName := schemeParts[0]
+       serviceInfos := schemeParts[1:]
+
+       // reject unknown scheme
+       switch serviceName {
+       case BinaryService, HTTPService, HTTPSService:
+       default:
+               return nil, &UnsupportedServiceNameError{ServiceName: 
serviceName}
        }
 
-       var serviceName string
-       var serviceInfos []string
-       scheme := url.Scheme
-       if scheme != "" {
-               scheme = strings.ToLower(scheme)
-               schemeParts := strings.Split(scheme, "+")
-               serviceName = schemeParts[0]
-               serviceInfos = schemeParts[1:]
+       // 4. Validate first host
+       firstHost, err := validateHostName(serviceName, serviceInfos, u.Host)
+       if err != nil {
+               return nil, err
        }
 
-       var serviceHosts []string
-       hosts := strings.FieldsFunc(url.Host, splitURI)
-       for _, v := range hosts {
-               h, err := validateHostName(serviceName, serviceInfos, v)
-               if err == nil {
-                       serviceHosts = append(serviceHosts, h)
-               } else {
-                       return nil, err
+       serviceHosts := []string{firstHost}
+
+       // 5. Validate remaining hosts
+       if additionalHosts != "" {
+               for _, h := range strings.FieldsFunc(additionalHosts, splitURI) 
{
+                       host, err := validateHostName(serviceName, 
serviceInfos, h)
+                       if err != nil {
+                               return nil, err
+                       }
+                       serviceHosts = append(serviceHosts, host)
                }
        }
 
        return &PulsarServiceURI{
-               serviceName,
-               serviceInfos,
-               serviceHosts,
-               url.Path,
-               url,
+               ServiceName:  serviceName,
+               ServiceInfos: serviceInfos,
+               ServiceHosts: serviceHosts,
+               servicePath:  u.Path,
+               URL:          u,
        }, nil
 }
 
@@ -165,46 +153,140 @@ func splitURI(r rune) bool {
        return r == ',' || r == ';'
 }
 
+func splitHostURI(uriStr string) (string, string) {
+       authorityStart := strings.Index(uriStr, "//")
+       if authorityStart < 0 {
+               return uriStr, ""
+       }
+       authorityStart += 2
+
+       authorityEnd := len(uriStr)
+       if authoritySuffixEnd := strings.IndexAny(uriStr[authorityStart:], 
"/?#"); authoritySuffixEnd >= 0 {
+               authorityEnd = authorityStart + authoritySuffixEnd
+       }
+
+       hostListStart := authorityStart
+       if userInfoEnd := 
strings.LastIndex(uriStr[authorityStart:authorityEnd], "@"); userInfoEnd >= 0 {
+               hostListStart += userInfoEnd + 1
+       }
+
+       if firstDelim := strings.IndexFunc(uriStr[hostListStart:authorityEnd], 
splitURI); firstDelim >= 0 {
+               firstDelimIdx := hostListStart + firstDelim
+               return uriStr[:firstDelimIdx] + uriStr[authorityEnd:], 
uriStr[firstDelimIdx+1 : authorityEnd]
+       }
+
+       return uriStr, ""
+}
+
 func validateHostName(serviceName string, serviceInfos []string, hostname 
string) (string, error) {
-       uri, err := url.Parse("dummyscheme://" + hostname)
+       host, port, err := splitHostPortOrDefault(serviceName, serviceInfos, 
hostname)
        if err != nil {
                return "", err
        }
-       host := uri.Hostname()
-       if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
-               host = fmt.Sprintf("[%s]", host)
-       }
-       if host == "" || uri.Scheme == "" {
-               return "", errors.New("Invalid hostname : " + hostname)
+
+       cleanHost, err := normalizeValidatedHost(hostname, host)
+       if err != nil {
+               return "", err
        }
 
-       port := uri.Port()
-       if uri.Port() == "" {
-               p := getServicePort(serviceName, serviceInfos)
-               if p == -1 {
-                       return "", fmt.Errorf("invalid port : %d", p)
+       return net.JoinHostPort(cleanHost, port), nil
+}
+
+func splitHostPortOrDefault(serviceName string, serviceInfos []string, 
hostname string) (string, string, error) {
+       // net.SplitHostPort enforces strict host:port syntax:
+       //   - IPv4: "127.0.0.1:6650"
+       //   - IPv6: "[fec0::1]:6650"
+       //
+       // It will fail for:
+       //   - hosts without a port
+       //   - bare IPv6 literals without brackets (e.g. "fec0::1")
+       host, port, err := net.SplitHostPort(hostname)
+       if err == nil {
+               if host == "" {
+                       // net.SplitHostPort accepts ":port" with an empty 
host, but we explicitly
+                       // reject such inputs because a non-empty hostname is 
required.
+                       return "", "", fmt.Errorf("invalid address: host is 
empty in %q", hostname)
                }
-               port = fmt.Sprint(p)
+               return host, port, nil
        }
-       result := host + ":" + port
-       _, _, err = net.SplitHostPort(result)
-       if err != nil {
-               return "", err
+
+       // If the hostname contains ':' but is not bracketed, it is very likely
+       // an invalid IPv6 literal or an invalid host with too many colons.
+       //
+       // Examples rejected here:
+       //   - "fec0::1"
+       //   - "fec0::1:6650"
+       //   - "localhost:6650:6651"
+       if strings.Contains(hostname, ":") && !strings.HasPrefix(hostname, "[") 
{
+               return "", "", fmt.Errorf("invalid address (maybe missing 
brackets for IPv6 or too many colons): %s", hostname)
+       }
+
+       defaultPort := getServicePort(serviceName, serviceInfos)
+       if defaultPort == -1 {
+               return "", "", fmt.Errorf("no port found")
+       }
+
+       return hostname, strconv.Itoa(defaultPort), nil
+}
+
+func normalizeValidatedHost(hostname, host string) (string, error) {
+       hasOpeningBracket := strings.HasPrefix(hostname, "[")
+       hasClosingBracket := strings.Contains(hostname, "]")
+       if hasOpeningBracket != hasClosingBracket {
+               return "", fmt.Errorf("invalid bracketed host: %s", hostname)
+       }
+
+       cleanHost := strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
+       if !hasOpeningBracket {
+               return cleanHost, nil
+       }
+
+       addr, err := netip.ParseAddr(cleanHost)
+       if err != nil || !addr.Is6() {
+               return "", fmt.Errorf("invalid IPv6 address: %s", hostname)
        }
-       return result, nil
+
+       return cleanHost, nil
 }
 
 func getServicePort(serviceName string, serviceInfos []string) int {
-       switch strings.ToLower(serviceName) {
+       switch serviceName {
        case BinaryService:
+               // For Pulsar, only the "ssl" modifier is allowed. Any other 
non-empty
+               // modifier is treated as invalid and causes port resolution to 
fail.
                if len(serviceInfos) == 0 {
                        return BinaryPort
-               } else if len(serviceInfos) == 1 && 
strings.ToLower(serviceInfos[0]) == SSLService {
+               }
+
+               hasSSL := false
+               for _, info := range serviceInfos {
+                       if info == "" {
+                               // Ignore empty modifiers if present.
+                               continue
+                       }
+                       if strings.EqualFold(info, SSLService) {
+                               hasSSL = true
+                               continue
+                       }
+                       // Unknown modifier: reject to avoid silently accepting 
typos.
+                       return -1
+               }
+
+               if hasSSL {
                        return BinaryTLSPort
                }
+               return BinaryPort
        case HTTPService:
+               // HTTP should not have any scheme modifiers; reject if present.
+               if len(serviceInfos) != 0 {
+                       return -1
+               }
                return HTTPPort
        case HTTPSService:
+               // HTTPS should not have any scheme modifiers; reject if 
present.
+               if len(serviceInfos) != 0 {
+                       return -1
+               }
                return HTTPSPort
        }
        return -1
diff --git a/pulsar/internal/service_uri_test.go 
b/pulsar/internal/service_uri_test.go
index 445b3254..1c6c1dbb 100644
--- a/pulsar/internal/service_uri_test.go
+++ b/pulsar/internal/service_uri_test.go
@@ -18,9 +18,11 @@
 package internal
 
 import (
+       "errors"
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 )
 
 func TestInvalidServiceUris(t *testing.T) {
@@ -31,6 +33,8 @@ func TestInvalidServiceUris(t *testing.T) {
                "pulsar://localhost:xyz/",        // invalid port
                "pulsar://localhost:-6650/",      // negative port
                "pulsar://fec0:0:0:ffff::1:6650", // missing brackets
+               "pulsar://[example]:6650",        // invalid hostname
+               "pulsar://fec0::1",               // missing brackets
        }
 
        for _, uri := range uris {
@@ -44,15 +48,41 @@ func TestEmptyServiceUriString(t *testing.T) {
        assert.NotNil(t, err)
 }
 
-func TestNullServiceUrlInstance(t *testing.T) {
-       u, err := NewPulsarServiceURIFromURL(nil)
-       assert.Nil(t, u)
-       assert.NotNil(t, err)
-}
-
 func TestMissingServiceName(t *testing.T) {
        serviceURI := "//localhost:6650/path/to/namespace"
-       assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, 
"/path/to/namespace", "")
+       _, err := NewPulsarServiceURIFromURIString(serviceURI)
+       require.Error(t, err)
+}
+
+func TestUnsupportedServiceNameError(t *testing.T) {
+       _, err := NewPulsarServiceURIFromURIString("ftp://localhost:21";)
+       require.Error(t, err)
+
+       var unsupportedServiceNameErr *UnsupportedServiceNameError
+       require.ErrorAs(t, err, &unsupportedServiceNameErr)
+       assert.Equal(t, "ftp", unsupportedServiceNameErr.ServiceName)
+       assert.True(t, errors.As(err, &unsupportedServiceNameErr))
+}
+
+func TestIsHTTP(t *testing.T) {
+       testCases := []struct {
+               name     string
+               uri      string
+               expected bool
+       }{
+               {name: "pulsar", uri: "pulsar://localhost:6650", expected: 
false},
+               {name: "pulsar ssl", uri: "pulsar+ssl://localhost:6651", 
expected: false},
+               {name: "http", uri: "http://localhost";, expected: true},
+               {name: "https", uri: "https://localhost";, expected: true},
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       serviceURI, err := 
NewPulsarServiceURIFromURIString(tc.uri)
+                       require.NoError(t, err)
+                       assert.Equal(t, tc.expected, serviceURI.IsHTTP())
+               })
+       }
 }
 
 func TestEmptyPath(t *testing.T) {
@@ -143,6 +173,21 @@ func TestMultipleHostsMixed(t *testing.T) {
                "/path/to/namespace", "")
 }
 
+func TestPathQueryAndFragmentDelimitersDoNotSplitHosts(t *testing.T) {
+       serviceURI := 
"pulsar://host1:6650/path,with;delimiters?param=a,b;c#frag,ment;tail"
+       uri, err := NewPulsarServiceURIFromURIString(serviceURI)
+       require.NoError(t, err)
+       require.NotNil(t, uri)
+       assert.Equal(t, []string{"host1:6650"}, uri.ServiceHosts)
+       assert.Equal(t, "/path,with;delimiters", uri.servicePath)
+       assert.Equal(t, "param=a,b;c", uri.URL.RawQuery)
+       assert.Equal(t, "frag,ment;tail", uri.URL.Fragment)
+}
+
+func TestInvalidBracketedAdditionalHost(t *testing.T) {
+       testInvalidServiceURI(t, 
"pulsar://host1:6650,[example]:6650/path/to/namespace")
+}
+
 func TestUserInfoWithMultipleHosts(t *testing.T) {
        serviceURI := 
"pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
        assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", 
"host2:6650", "host3:6650"},


Reply via email to