This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dfe03c4fc1b [improve][client] PIP-234: Support sharing the memory 
limit controller across multiple isolated Pulsar client instances (#25477)
dfe03c4fc1b is described below

commit dfe03c4fc1b0611fbc77698e5c0e328d7ef3242b
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Apr 10 11:29:04 2026 +0800

    [improve][client] PIP-234: Support sharing the memory limit controller 
across multiple isolated Pulsar client instances (#25477)
---
 .../client/impl/ConsumerMemoryLimitTest.java       | 105 +++++++++++++++++++++
 .../client/impl/ProducerMemoryLimitTest.java       |  85 +++++++++++++++++
 .../pulsar/client/impl/PulsarTestClient.java       |   2 +-
 .../pulsar/client/api/MemoryLimitConfig.java       |  38 ++++++++
 .../pulsar/client/api/OpenTelemetryConfig.java     |  40 ++++++++
 .../client/api/PulsarClientSharedResources.java    |  11 ++-
 .../api/PulsarClientSharedResourcesBuilder.java    |  16 ++++
 .../pulsar/client/impl/MemoryLimitController.java  |  29 ++++--
 .../pulsar/client/impl/PulsarClientImpl.java       |  43 ++++++---
 .../impl/PulsarClientResourcesConfigurer.java      |  23 +++++
 .../PulsarClientSharedResourcesBuilderImpl.java    |  48 ++++++++++
 .../impl/PulsarClientSharedResourcesImpl.java      |  34 +++++++
 12 files changed, 448 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
index f3bf80a646c..df4874317c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -22,8 +22,11 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -92,4 +95,106 @@ public class ConsumerMemoryLimitTest extends 
SharedPulsarBaseTest {
             c1.receive();
         }
     }
+
+    @Test
+    public void testMultiPulsarClientConsumerShareMemoryLimitController() 
throws Exception {
+        int msgSize = 100;
+        int msgCount = 3;
+        int memoryLimit = msgSize * msgCount;
+        String topic1 = newTopicName();
+        String topic2 = newTopicName();
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                .configureMemoryLimitController(
+                        memoryLimitConfig -> 
memoryLimitConfig.memoryLimit(memoryLimit, SizeUnit.BYTES)).build();
+        @Cleanup
+        PulsarClientImpl pulsarClient =
+                ((PulsarClientImpl) 
PulsarClient.builder().serviceUrl(getBrokerServiceUrl()).build());
+        @Cleanup
+        PulsarClientImpl pulsarClient1 = ((PulsarClientImpl) 
PulsarClient.builder().serviceUrl(getBrokerServiceUrl())
+                .sharedResources(sharedResources).build());
+        @Cleanup
+        PulsarClientImpl pulsarClient2 = ((PulsarClientImpl) 
PulsarClient.builder().serviceUrl(getBrokerServiceUrl())
+                .sharedResources(sharedResources).build());
+
+        Assert.assertSame(pulsarClient1.getMemoryLimitController(), 
pulsarClient2.getMemoryLimitController());
+
+        @Cleanup
+        Producer<byte[]> topic1Producer =
+                
pulsarClient.newProducer().topic(topic1).enableBatching(false).blockIfQueueFull(false).create();
+        @Cleanup
+        Producer<byte[]> topic2Producer =
+                
pulsarClient.newProducer().topic(topic2).enableBatching(false).blockIfQueueFull(false).create();
+        ConsumerImpl<byte[]> topic1Consumer =
+                (ConsumerImpl<byte[]>) 
pulsarClient1.newConsumer().subscriptionName("topic1-sub").topic(topic1)
+                        .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+        ConsumerImpl<byte[]> topic2Consumer =
+                (ConsumerImpl<byte[]>) 
pulsarClient2.newConsumer().subscriptionName("topic2-sub").topic(topic2)
+                        .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+
+        MemoryLimitController memoryLimitController1 = 
topic1Consumer.getMemoryLimitController().get();
+        MemoryLimitController memoryLimitController2 = 
topic2Consumer.getMemoryLimitController().get();
+        Assert.assertSame(memoryLimitController1, memoryLimitController2);
+
+        Assert.assertEquals(topic1Consumer.getCurrentReceiverQueueSize(), 1);
+        Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(), 1);
+
+
+        topic1Producer.send(new byte[msgSize]);
+        Awaitility.await().until(topic1Consumer.scaleReceiverQueueHint::get);
+
+        Message<byte[]> topic1Message = topic1Consumer.receive();
+        Assert.assertNotNull(topic1Message);
+        Assert.assertEquals(topic1Consumer.getCurrentReceiverQueueSize(), 1);
+
+        // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand 
receiverQueueSize.
+        topic1Consumer.receiveAsync();
+        Awaitility.await().until(() -> 
topic1Consumer.getCurrentReceiverQueueSize() == 2);
+
+
+        topic2Producer.send(new byte[msgSize]);
+        Awaitility.await().until(topic2Consumer.scaleReceiverQueueHint::get);
+
+        Message<byte[]> topic2Message = topic2Consumer.receive();
+        Assert.assertNotNull(topic2Message);
+        Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(), 1);
+
+        // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand 
receiverQueueSize.
+        topic2Consumer.receiveAsync();
+        Awaitility.await().until(() -> 
topic2Consumer.getCurrentReceiverQueueSize() == 2);
+
+
+        // topic1Consumer.receiveAsync() will take one message, so we should 
send (msgCount + 1) messages.
+        // Trigger ConsumerBase.reduceCurrentReceiverQueueSize() method to 
reduce receiverQueueSize.
+        topic1Consumer.setCurrentReceiverQueueSize(msgCount);
+        for (int i = 0; i < msgCount + 1; i++) {
+            topic1Producer.send(new byte[msgSize]);
+        }
+        Awaitility.await().until(() -> memoryLimitController1.currentUsage() 
== memoryLimit);
+        Awaitility.await().until(() -> 
topic1Consumer.getCurrentReceiverQueueSize() == 1);
+        Awaitility.await().until(() -> 
topic2Consumer.getCurrentReceiverQueueSize() == 1);
+
+        // topic2Consumer.receiveAsync() will take one message, so we should 
send (msgCount + 1) messages.
+        for (int i = 0; i < msgCount + 1; i++) {
+            topic2Producer.send(new byte[msgSize]);
+        }
+        // topic2Consumer will not expand receiverQueueSize due to memory 
limit reached.
+        for (int i = 0; i < msgCount; i++) {
+            topic2Message = topic2Consumer.receive();
+            Assert.assertNotNull(topic2Message);
+            Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(), 
1);
+        }
+
+        // Close topic1Consumer to clear release memory.
+        topic1Consumer.close();
+        Assert.assertEquals(memoryLimitController1.currentUsage(), 0);
+
+        // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand 
receiverQueueSize.
+        topic2Consumer.receiveAsync();
+        Awaitility.await().until(() -> 
topic2Consumer.getCurrentReceiverQueueSize() == 2);
+
+        // Close topic1Consumer to clear release memory.
+        topic2Consumer.close();
+        Assert.assertEquals(memoryLimitController2.currentUsage(), 0);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 55a67ae644d..97c91c64667 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -30,10 +30,13 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.api.SizeUnit;
+import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -229,6 +232,88 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
         Assert.assertEquals(memoryLimitController.currentUsage(), 0);
     }
 
+    @Test(timeOut = 15000)
+    public void testMultiPulsarClientProducerShareMemoryLimitController() 
throws Exception {
+        int msgSize = 100;
+        int msgCount = 6;
+        int memoryLimit = msgSize * msgCount;
+        PulsarClientSharedResources sharedResources = 
PulsarClientSharedResources.builder()
+                .configureMemoryLimitController(
+                        memoryLimitConfig -> 
memoryLimitConfig.memoryLimit(memoryLimit, SizeUnit.BYTES)).build();
+        @Cleanup
+        PulsarClientImpl pulsarClient1 = ((PulsarClientImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString())
+                .sharedResources(sharedResources).build());
+        @Cleanup
+        PulsarClientImpl pulsarClient2 = ((PulsarClientImpl) 
PulsarClient.builder().serviceUrl(lookupUrl.toString())
+                .sharedResources(sharedResources).build());
+
+        Assert.assertSame(pulsarClient1.getMemoryLimitController(), 
pulsarClient2.getMemoryLimitController());
+
+        Producer<byte[]> producer1 = pulsarClient1.newProducer()
+                .topic("testProducerShareMemoryLimitController")
+                .sendTimeout(20, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .blockIfQueueFull(false)
+                .enableBatching(false)
+                .create();
+        Producer<byte[]> producer2 = pulsarClient2.newProducer()
+                .topic("testProducerShareMemoryLimitController")
+                .sendTimeout(20, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .blockIfQueueFull(false)
+                .enableBatching(false)
+                .create();
+
+        this.stopBroker();
+
+        byte[] msgBytes = new byte[msgSize];
+        int halfMsgCount = msgCount / 2;
+        int producer1MemoryUsage = halfMsgCount * msgSize;
+        for (int i = 0; i < msgCount / 2; i++) {
+            producer1.sendAsync(msgBytes);
+        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsarClient1.getMemoryLimitController().currentUsage() == 
producer1MemoryUsage);
+
+        // The MemoryLimitController.tryReserveMemory() method returns false 
only when currentUsage > memoryLimit.
+        int producer2MsgCount = halfMsgCount + 1;
+        int producer2MemoryUsage = producer2MsgCount * msgSize;
+        int totalMemoryUsage = producer1MemoryUsage + producer2MemoryUsage;
+        for (int i = 0; i < producer2MsgCount; i++) {
+            producer2.sendAsync(msgBytes);
+        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsarClient2.getMemoryLimitController().currentUsage() == totalMemoryUsage);
+
+        try {
+            producer1.send(msgBytes);
+            Assert.fail("producer can not send message due to memory limit 
exceeded");
+        } catch (PulsarClientException.MemoryBufferIsFullError ex) {
+            long currentUsage = 
pulsarClient1.getMemoryLimitController().currentUsage();
+            Assert.assertEquals(currentUsage, totalMemoryUsage);
+        }
+
+        try {
+            producer2.send(msgBytes);
+            Assert.fail("producer can not send message due to memory limit 
exceeded");
+        } catch (PulsarClientException.MemoryBufferIsFullError ex) {
+            long currentUsage = 
pulsarClient2.getMemoryLimitController().currentUsage();
+            Assert.assertEquals(currentUsage, totalMemoryUsage);
+        }
+
+        producer1.close();
+        
Assert.assertEquals(pulsarClient1.getMemoryLimitController().currentUsage(), 
producer2MemoryUsage);
+
+        for (int i = 0; i < halfMsgCount; i++) {
+            producer2.sendAsync(msgBytes);
+        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> 
pulsarClient2.getMemoryLimitController().currentUsage() == totalMemoryUsage);
+
+        producer2.close();
+        
Assert.assertEquals(pulsarClient1.getMemoryLimitController().currentUsage(), 0);
+    }
+
     private void initClientWithMemoryLimit() throws PulsarClientException {
         replacePulsarClient(PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index db330951f74..d72426e3d8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -91,7 +91,7 @@ public class PulsarTestClient extends PulsarClientImpl {
     private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
                              AtomicReference<Supplier<ClientCnx>> 
clientCnxSupplierReference)
             throws PulsarClientException {
-        super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new 
DnsResolverGroupImpl(conf));
+        super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new 
DnsResolverGroupImpl(conf), null);
         // workaround initialization order issue so that ClientCnx can be 
created in this class
         clientCnxSupplierReference.set(this::createClientCnx);
     }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
new file mode 100644
index 00000000000..14a85ee305e
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+/**
+ * Configuration interface for memory limit settings.
+ */
+public interface MemoryLimitConfig {
+
+    /**
+     * Configure a limit on the amount of direct memory that will be allocated 
by this shared client instance.
+     * <p>
+     * See also {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
+     *
+     * @param memoryLimit the memory limit value, setting this to 0 will 
disable the limit
+     * @param unit        the memory limit size unit
+     * @return the memory limit configuration instance for chained calls
+     */
+    MemoryLimitConfig memoryLimit(long memoryLimit, SizeUnit unit);
+
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
new file mode 100644
index 00000000000..0b05f960041
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+
+import io.opentelemetry.api.OpenTelemetry;
+
+/**
+ * Configuration interface for open telemetry settings.
+ */
+public interface OpenTelemetryConfig {
+
+    /**
+     * Configure OpenTelemetry for this shared client instance.
+     * <p>
+     * See also {@link ClientBuilder#openTelemetry(OpenTelemetry)}.
+     *
+     * @param openTelemetry the open telemetry instance
+     * @return the open telemetry configuration instance for chained calls
+     */
+    OpenTelemetryConfig openTelemetry(OpenTelemetry openTelemetry);
+
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
index 34a56ebd608..3b2db4e9233 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
@@ -71,7 +71,12 @@ public interface PulsarClientSharedResources extends 
AutoCloseable {
         // pulsar-lookup threadpool
         LookupExecutor(SharedResourceType.ThreadPool),
         // DNS resolver and cache that must be shared together with 
eventLoopGroup
-        DnsResolver(SharedResourceType.DnsResolver);
+        DnsResolver(SharedResourceType.DnsResolver),
+        // pulsar client global memory limit controller
+        MemoryLimitController(SharedResourceType.MemoryLimitController),
+        // pulsar client global open telemetry instance
+        OpenTelemetry(SharedResourceType.OpenTelemetry)
+        ;
 
         private final SharedResourceType type;
 
@@ -88,7 +93,9 @@ public interface PulsarClientSharedResources extends 
AutoCloseable {
         EventLoopGroup,
         ThreadPool,
         Timer,
-        DnsResolver;
+        DnsResolver,
+        MemoryLimitController,
+        OpenTelemetry;
     }
 
     /**
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
index 3a94d5352b5..227f20da7ad 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
@@ -119,4 +119,20 @@ public interface PulsarClientSharedResourcesBuilder {
      */
     PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig> 
configurer);
 
+    /**
+     * Configures the memory limit settings.
+     *
+     * @param configurer a consumer that configures the memory limit settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer);
+
+    /**
+     * Configures the open telemetry settings.
+     *
+     * @param configurer a consumer that configures the open telemetry settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureOpenTelemetry(Consumer<OpenTelemetryConfig> configurer);
+
 }
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index 4f8a6fdb06a..489562278b2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -29,22 +30,24 @@ public class MemoryLimitController {
 
     private final long memoryLimit;
     private final long triggerThreshold;
-    private final Runnable trigger;
+    private final CopyOnWriteArraySet<Runnable> triggers = new 
CopyOnWriteArraySet<>();
     private final AtomicLong currentUsage = new AtomicLong();
     private final ReentrantLock mutex = new ReentrantLock(false);
     private final Condition condition = mutex.newCondition();
     private final AtomicBoolean triggerRunning = new AtomicBoolean(false);
 
     public MemoryLimitController(long memoryLimitBytes) {
-        this.memoryLimit = memoryLimitBytes;
-        triggerThreshold = 0;
-        trigger = null;
+        this(memoryLimitBytes, 0);
     }
 
-    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, 
Runnable trigger) {
+    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold) 
{
         this.memoryLimit = memoryLimitBytes;
         this.triggerThreshold = triggerThreshold;
-        this.trigger = trigger;
+    }
+
+    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, 
Runnable trigger) {
+        this(memoryLimitBytes, triggerThreshold);
+        this.triggers.add(trigger);
     }
 
     public void forceReserveMemory(long size) {
@@ -88,10 +91,12 @@ public class MemoryLimitController {
     }
 
     private void checkTrigger(long prevUsage, long newUsage) {
-        if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && 
trigger != null) {
+        if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && 
!triggers.isEmpty()) {
             if (triggerRunning.compareAndSet(false, true)) {
                 try {
-                    trigger.run();
+                    for (Runnable trigger : triggers) {
+                        trigger.run();
+                    }
                 } finally {
                     triggerRunning.set(false);
                 }
@@ -152,4 +157,12 @@ public class MemoryLimitController {
     public long memoryLimit() {
         return memoryLimit;
     }
+
+    public void registerTrigger(Runnable trigger) {
+        triggers.add(trigger);
+    }
+
+    public void deregisterTrigger(Runnable trigger) {
+        triggers.remove(trigger);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c1034925ce1..020786ee578 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -106,7 +106,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClientImpl.class);
     private static final int CLOSE_TIMEOUT_SECONDS = 60;
-    private static final double 
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
+    protected static final double 
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
 
     // default limits for producers when memory limit controller is disabled
     private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES = 
1000;
@@ -171,23 +171,25 @@ public class PulsarClientImpl implements PulsarClient {
     @Getter
     private TransactionCoordinatorClientImpl tcClient;
 
+    private final Runnable memoryLimitTrigger = 
this::reduceConsumerReceiverQueueSize;
+
     public PulsarClientImpl(ClientConfigurationData conf) throws 
PulsarClientException {
-        this(conf, null, null, null, null, null, null, null, null);
+        this(conf, null, null, null, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, null, null, null, null, null, null, null);
+        this(conf, eventLoopGroup, null, null, null, null, null, null, null, 
null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, null, null, null, null, 
null);
+        this(conf, eventLoopGroup, cnxPool, null, null, null, null, null, 
null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool cnxPool,
                             Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null, 
null);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null, 
null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, ConnectionPool connectionPool,
@@ -196,7 +198,7 @@ public class PulsarClientImpl implements PulsarClient {
                             ScheduledExecutorProvider 
scheduledExecutorProvider)
             throws PulsarClientException {
         this(conf, eventLoopGroup, connectionPool, timer, 
externalExecutorProvider, internalExecutorProvider,
-                scheduledExecutorProvider, null, null);
+                scheduledExecutorProvider, null, null, null);
     }
 
     @Builder(builderClassName = "PulsarClientImplBuilder")
@@ -205,8 +207,8 @@ public class PulsarClientImpl implements PulsarClient {
                      ExecutorProvider internalExecutorProvider,
                      ScheduledExecutorProvider scheduledExecutorProvider,
                      ExecutorProvider lookupExecutorProvider,
-                     DnsResolverGroupImpl dnsResolverGroup) throws 
PulsarClientException {
-
+                     DnsResolverGroupImpl dnsResolverGroup,
+                     MemoryLimitController memoryLimitController) throws 
PulsarClientException {
         EventLoopGroup eventLoopGroupReference = null;
         ConnectionPool connectionPoolReference = null;
         try {
@@ -286,14 +288,19 @@ public class PulsarClientImpl implements PulsarClient {
                 }
             }
 
-            memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes(),
-                    (long) (conf.getMemoryLimitBytes() * 
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
-                    this::reduceConsumerReceiverQueueSize);
-            // Only create memory buffer metrics if memory limiting is enabled
-            if (memoryLimitController.isMemoryLimited()) {
-                memoryBufferStats = new MemoryBufferStats(instrumentProvider, 
memoryLimitController);
+            if (memoryLimitController == null) {
+                this.memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes(),
+                        (long) (conf.getMemoryLimitBytes() * 
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
+                        this.memoryLimitTrigger);
+            } else {
+                this.memoryLimitController = memoryLimitController;
+                
this.memoryLimitController.registerTrigger(this.memoryLimitTrigger);
+            }
+            // Only create memory buffer metrics if memory limit controller is 
local and memory limiting is enabled.
+            if (memoryLimitController == null && 
this.memoryLimitController.isMemoryLimited()) {
+                this.memoryBufferStats = new 
MemoryBufferStats(this.instrumentProvider, this.memoryLimitController);
             } else {
-                memoryBufferStats = null;
+                this.memoryBufferStats = null;
             }
             state.set(State.Open);
         } catch (Throwable t) {
@@ -1003,6 +1010,7 @@ public class PulsarClientImpl implements PulsarClient {
             } catch (PulsarClientException e) {
                 throwable = e;
             }
+
             if (memoryBufferStats != null) {
                 try {
                     memoryBufferStats.close();
@@ -1011,6 +1019,11 @@ public class PulsarClientImpl implements PulsarClient {
                     throwable = t;
                 }
             }
+
+            if (memoryLimitController != null) {
+                memoryLimitController.deregisterTrigger(memoryLimitTrigger);
+            }
+
             if (conf != null && conf.getAuthentication() != null) {
                 try {
                     conf.getAuthentication().close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
index 28f97e92159..391cb93dba7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.client.impl;
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.Optional;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -146,4 +149,24 @@ class PulsarClientResourcesConfigurer {
         }
         return new DnsResolverGroupImpl(resourceConfig);
     }
+
+    static MemoryLimitController createMemoryLimitControllerWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.MemoryLimitResourceConfig 
resourceConfig) {
+        if (resourceConfig == null) {
+            resourceConfig = new 
PulsarClientSharedResourcesBuilderImpl.MemoryLimitResourceConfig();
+        }
+        long memoryLimit = resourceConfig.memoryLimit;
+        // Keep the same logic with PulsarClientImpl's memory limit settings.
+        long triggerThreshold =
+                (long) (memoryLimit * 
PulsarClientImpl.THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING);
+        return new MemoryLimitController(memoryLimit, triggerThreshold);
+    }
+
+    static InstrumentProvider createInstrumentProviderWithResourceConfig(
+            PulsarClientSharedResourcesBuilderImpl.OpenTelemetryResourceConfig 
resourceConfig) {
+        // Use global openTelemetry instance if not configured.
+        OpenTelemetry openTelemetry =
+                Optional.ofNullable(resourceConfig).map(config -> 
config.openTelemetry).orElse(null);
+        return new InstrumentProvider(openTelemetry);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
index f8ac98bb862..e2dc5995167 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Lists;
+import io.opentelemetry.api.OpenTelemetry;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,8 +31,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.apache.pulsar.client.api.DnsResolverConfig;
 import org.apache.pulsar.client.api.EventLoopGroupConfig;
+import org.apache.pulsar.client.api.MemoryLimitConfig;
+import org.apache.pulsar.client.api.OpenTelemetryConfig;
 import org.apache.pulsar.client.api.PulsarClientSharedResources;
 import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.ThreadPoolConfig;
 import org.apache.pulsar.client.api.TimerConfig;
 import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -202,6 +206,34 @@ public class PulsarClientSharedResourcesBuilderImpl 
implements PulsarClientShare
         }
     }
 
+    static class MemoryLimitResourceConfig implements ResourceConfig, 
MemoryLimitConfig {
+        long memoryLimit;
+
+        public MemoryLimitResourceConfig() {
+        }
+
+        @Override
+        public MemoryLimitConfig memoryLimit(long memoryLimit, SizeUnit unit) {
+            this.memoryLimit = unit.toBytes(memoryLimit);
+            return this;
+        }
+
+    }
+
+    static class OpenTelemetryResourceConfig implements ResourceConfig, 
OpenTelemetryConfig {
+        OpenTelemetry openTelemetry;
+
+        public OpenTelemetryResourceConfig() {
+        }
+
+        @Override
+        public OpenTelemetryConfig openTelemetry(OpenTelemetry openTelemetry) {
+            this.openTelemetry = openTelemetry;
+            return this;
+        }
+
+    }
+
     @Override
     public PulsarClientSharedResourcesBuilder resourceTypes(
             PulsarClientSharedResources.SharedResource... sharedResource) {
@@ -242,6 +274,10 @@ public class PulsarClientSharedResourcesBuilderImpl 
implements PulsarClientShare
                     return new ThreadPoolResourceConfig();
                 case Timer:
                     return new TimerResourceConfig();
+                case MemoryLimitController:
+                    return new MemoryLimitResourceConfig();
+                case OpenTelemetry:
+                    return new OpenTelemetryResourceConfig();
                 default:
                     throw new IllegalArgumentException("Unknown resource type: 
" + sharedResource.getType());
             }
@@ -277,6 +313,18 @@ public class PulsarClientSharedResourcesBuilderImpl 
implements PulsarClientShare
         return this;
     }
 
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.MemoryLimitController));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureOpenTelemetry(Consumer<OpenTelemetryConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.OpenTelemetry));
+        return this;
+    }
+
     @Override
     public PulsarClientSharedResources build() {
         return new PulsarClientSharedResourcesImpl(sharedResources, 
resourceConfigs, shareConfigured);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
index 29953784681..d6eb459e23e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.client.impl;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createDnsResolverGroupWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createEventLoopGroupWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createExternalExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInstrumentProviderWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInternalExecutorProviderWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createLookupExecutorProviderWithResourceConfig;
+import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createMemoryLimitControllerWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createScheduledExecutorProviderWithResourceConfig;
 import static 
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createTimer;
 import io.netty.channel.EventLoopGroup;
@@ -31,12 +33,16 @@ import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.MemoryBufferStats;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 
+@Slf4j
 @Getter
 public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResources {
     Set<SharedResource> sharedResources;
@@ -47,6 +53,9 @@ public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResour
     private final Timer timer;
     private final ExecutorProvider lookupExecutorProvider;
     private final DnsResolverGroupImpl dnsResolverGroup;
+    private final MemoryLimitController memoryLimitController;
+    private final InstrumentProvider instrumentProvider;
+    private final MemoryBufferStats memoryBufferStats;
 
     public PulsarClientSharedResourcesImpl(Set<SharedResource> sharedResources,
                                            Map<SharedResource, 
PulsarClientSharedResourcesBuilderImpl.ResourceConfig>
@@ -91,6 +100,21 @@ public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResour
                 ? createDnsResolverGroupWithResourceConfig(
                 getResourceConfig(resourceConfigs, SharedResource.DnsResolver))
                 : null;
+        this.memoryLimitController = 
this.sharedResources.contains(SharedResource.MemoryLimitController)
+                ? createMemoryLimitControllerWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.MemoryLimitController))
+                : null;
+        this.instrumentProvider = 
this.sharedResources.contains(SharedResource.OpenTelemetry)
+                ? createInstrumentProviderWithResourceConfig(
+                getResourceConfig(resourceConfigs, 
SharedResource.OpenTelemetry))
+                : null;
+        // Only create memory buffer metrics if memory limiting is enabled.
+        if (this.memoryLimitController != null && 
this.memoryLimitController.isMemoryLimited()
+                && this.instrumentProvider != null) {
+            this.memoryBufferStats = new 
MemoryBufferStats(this.instrumentProvider, this.memoryLimitController);
+        } else {
+            this.memoryBufferStats = null;
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -132,6 +156,13 @@ public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResour
         if (ioEventLoopGroup != null) {
             EventLoopUtil.shutdownGracefully(ioEventLoopGroup);
         }
+        if (memoryBufferStats != null) {
+            try {
+                memoryBufferStats.close();
+            } catch (Throwable t) {
+                log.warn("Failed to close shared memoryBufferStats", t);
+            }
+        }
     }
 
     public void applyTo(PulsarClientImpl.PulsarClientImplBuilder 
instanceBuilder) {
@@ -156,5 +187,8 @@ public class PulsarClientSharedResourcesImpl implements 
PulsarClientSharedResour
         if (ioEventLoopGroup != null) {
             instanceBuilder.eventLoopGroup(ioEventLoopGroup);
         }
+        if (memoryLimitController != null) {
+            instanceBuilder.memoryLimitController(memoryLimitController);
+        }
     }
 }


Reply via email to