This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dfe03c4fc1b [improve][client] PIP-234: Support sharing the memory
limit controller across multiple isolated Pulsar client instances (#25477)
dfe03c4fc1b is described below
commit dfe03c4fc1b0611fbc77698e5c0e328d7ef3242b
Author: Oneby Wang <[email protected]>
AuthorDate: Fri Apr 10 11:29:04 2026 +0800
[improve][client] PIP-234: Support sharing the memory limit controller
across multiple isolated Pulsar client instances (#25477)
---
.../client/impl/ConsumerMemoryLimitTest.java | 105 +++++++++++++++++++++
.../client/impl/ProducerMemoryLimitTest.java | 85 +++++++++++++++++
.../pulsar/client/impl/PulsarTestClient.java | 2 +-
.../pulsar/client/api/MemoryLimitConfig.java | 38 ++++++++
.../pulsar/client/api/OpenTelemetryConfig.java | 40 ++++++++
.../client/api/PulsarClientSharedResources.java | 11 ++-
.../api/PulsarClientSharedResourcesBuilder.java | 16 ++++
.../pulsar/client/impl/MemoryLimitController.java | 29 ++++--
.../pulsar/client/impl/PulsarClientImpl.java | 43 ++++++---
.../impl/PulsarClientResourcesConfigurer.java | 23 +++++
.../PulsarClientSharedResourcesBuilderImpl.java | 48 ++++++++++
.../impl/PulsarClientSharedResourcesImpl.java | 34 +++++++
12 files changed, 448 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
index f3bf80a646c..df4874317c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerMemoryLimitTest.java
@@ -22,8 +22,11 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.api.SizeUnit;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -92,4 +95,106 @@ public class ConsumerMemoryLimitTest extends
SharedPulsarBaseTest {
c1.receive();
}
}
+
+ @Test
+ public void testMultiPulsarClientConsumerShareMemoryLimitController()
throws Exception {
+ int msgSize = 100;
+ int msgCount = 3;
+ int memoryLimit = msgSize * msgCount;
+ String topic1 = newTopicName();
+ String topic2 = newTopicName();
+ PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder()
+ .configureMemoryLimitController(
+ memoryLimitConfig ->
memoryLimitConfig.memoryLimit(memoryLimit, SizeUnit.BYTES)).build();
+ @Cleanup
+ PulsarClientImpl pulsarClient =
+ ((PulsarClientImpl)
PulsarClient.builder().serviceUrl(getBrokerServiceUrl()).build());
+ @Cleanup
+ PulsarClientImpl pulsarClient1 = ((PulsarClientImpl)
PulsarClient.builder().serviceUrl(getBrokerServiceUrl())
+ .sharedResources(sharedResources).build());
+ @Cleanup
+ PulsarClientImpl pulsarClient2 = ((PulsarClientImpl)
PulsarClient.builder().serviceUrl(getBrokerServiceUrl())
+ .sharedResources(sharedResources).build());
+
+ Assert.assertSame(pulsarClient1.getMemoryLimitController(),
pulsarClient2.getMemoryLimitController());
+
+ @Cleanup
+ Producer<byte[]> topic1Producer =
+
pulsarClient.newProducer().topic(topic1).enableBatching(false).blockIfQueueFull(false).create();
+ @Cleanup
+ Producer<byte[]> topic2Producer =
+
pulsarClient.newProducer().topic(topic2).enableBatching(false).blockIfQueueFull(false).create();
+ ConsumerImpl<byte[]> topic1Consumer =
+ (ConsumerImpl<byte[]>)
pulsarClient1.newConsumer().subscriptionName("topic1-sub").topic(topic1)
+ .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+ ConsumerImpl<byte[]> topic2Consumer =
+ (ConsumerImpl<byte[]>)
pulsarClient2.newConsumer().subscriptionName("topic2-sub").topic(topic2)
+ .autoScaledReceiverQueueSizeEnabled(true).subscribe();
+
+ MemoryLimitController memoryLimitController1 =
topic1Consumer.getMemoryLimitController().get();
+ MemoryLimitController memoryLimitController2 =
topic2Consumer.getMemoryLimitController().get();
+ Assert.assertSame(memoryLimitController1, memoryLimitController2);
+
+ Assert.assertEquals(topic1Consumer.getCurrentReceiverQueueSize(), 1);
+ Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(), 1);
+
+
+ topic1Producer.send(new byte[msgSize]);
+ Awaitility.await().until(topic1Consumer.scaleReceiverQueueHint::get);
+
+ Message<byte[]> topic1Message = topic1Consumer.receive();
+ Assert.assertNotNull(topic1Message);
+ Assert.assertEquals(topic1Consumer.getCurrentReceiverQueueSize(), 1);
+
+ // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand
receiverQueueSize.
+ topic1Consumer.receiveAsync();
+ Awaitility.await().until(() ->
topic1Consumer.getCurrentReceiverQueueSize() == 2);
+
+
+ topic2Producer.send(new byte[msgSize]);
+ Awaitility.await().until(topic2Consumer.scaleReceiverQueueHint::get);
+
+ Message<byte[]> topic2Message = topic2Consumer.receive();
+ Assert.assertNotNull(topic2Message);
+ Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(), 1);
+
+ // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand
receiverQueueSize.
+ topic2Consumer.receiveAsync();
+ Awaitility.await().until(() ->
topic2Consumer.getCurrentReceiverQueueSize() == 2);
+
+
+ // topic1Consumer.receiveAsync() will take one message, so we should
send (msgCount + 1) messages.
+ // Trigger ConsumerBase.reduceCurrentReceiverQueueSize() method to
reduce receiverQueueSize.
+ topic1Consumer.setCurrentReceiverQueueSize(msgCount);
+ for (int i = 0; i < msgCount + 1; i++) {
+ topic1Producer.send(new byte[msgSize]);
+ }
+ Awaitility.await().until(() -> memoryLimitController1.currentUsage()
== memoryLimit);
+ Awaitility.await().until(() ->
topic1Consumer.getCurrentReceiverQueueSize() == 1);
+ Awaitility.await().until(() ->
topic2Consumer.getCurrentReceiverQueueSize() == 1);
+
+ // topic2Consumer.receiveAsync() will take one message, so we should
send (msgCount + 1) messages.
+ for (int i = 0; i < msgCount + 1; i++) {
+ topic2Producer.send(new byte[msgSize]);
+ }
+ // topic2Consumer will not expand receiverQueueSize due to memory
limit reached.
+ for (int i = 0; i < msgCount; i++) {
+ topic2Message = topic2Consumer.receive();
+ Assert.assertNotNull(topic2Message);
+ Assert.assertEquals(topic2Consumer.getCurrentReceiverQueueSize(),
1);
+ }
+
+ // Close topic1Consumer to clear release memory.
+ topic1Consumer.close();
+ Assert.assertEquals(memoryLimitController1.currentUsage(), 0);
+
+ // Trigger ConsumerBase.expectMoreIncomingMessages() method to expand
receiverQueueSize.
+ topic2Consumer.receiveAsync();
+ Awaitility.await().until(() ->
topic2Consumer.getCurrentReceiverQueueSize() == 2);
+
+ // Close topic1Consumer to clear release memory.
+ topic2Consumer.close();
+ Assert.assertEquals(memoryLimitController2.currentUsage(), 0);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 55a67ae644d..97c91c64667 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -30,10 +30,13 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.api.SizeUnit;
+import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -229,6 +232,88 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
+ @Test(timeOut = 15000)
+ public void testMultiPulsarClientProducerShareMemoryLimitController()
throws Exception {
+ int msgSize = 100;
+ int msgCount = 6;
+ int memoryLimit = msgSize * msgCount;
+ PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder()
+ .configureMemoryLimitController(
+ memoryLimitConfig ->
memoryLimitConfig.memoryLimit(memoryLimit, SizeUnit.BYTES)).build();
+ @Cleanup
+ PulsarClientImpl pulsarClient1 = ((PulsarClientImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString())
+ .sharedResources(sharedResources).build());
+ @Cleanup
+ PulsarClientImpl pulsarClient2 = ((PulsarClientImpl)
PulsarClient.builder().serviceUrl(lookupUrl.toString())
+ .sharedResources(sharedResources).build());
+
+ Assert.assertSame(pulsarClient1.getMemoryLimitController(),
pulsarClient2.getMemoryLimitController());
+
+ Producer<byte[]> producer1 = pulsarClient1.newProducer()
+ .topic("testProducerShareMemoryLimitController")
+ .sendTimeout(20, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .blockIfQueueFull(false)
+ .enableBatching(false)
+ .create();
+ Producer<byte[]> producer2 = pulsarClient2.newProducer()
+ .topic("testProducerShareMemoryLimitController")
+ .sendTimeout(20, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .blockIfQueueFull(false)
+ .enableBatching(false)
+ .create();
+
+ this.stopBroker();
+
+ byte[] msgBytes = new byte[msgSize];
+ int halfMsgCount = msgCount / 2;
+ int producer1MemoryUsage = halfMsgCount * msgSize;
+ for (int i = 0; i < msgCount / 2; i++) {
+ producer1.sendAsync(msgBytes);
+ }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsarClient1.getMemoryLimitController().currentUsage() ==
producer1MemoryUsage);
+
+ // The MemoryLimitController.tryReserveMemory() method returns false
only when currentUsage > memoryLimit.
+ int producer2MsgCount = halfMsgCount + 1;
+ int producer2MemoryUsage = producer2MsgCount * msgSize;
+ int totalMemoryUsage = producer1MemoryUsage + producer2MemoryUsage;
+ for (int i = 0; i < producer2MsgCount; i++) {
+ producer2.sendAsync(msgBytes);
+ }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsarClient2.getMemoryLimitController().currentUsage() == totalMemoryUsage);
+
+ try {
+ producer1.send(msgBytes);
+ Assert.fail("producer can not send message due to memory limit
exceeded");
+ } catch (PulsarClientException.MemoryBufferIsFullError ex) {
+ long currentUsage =
pulsarClient1.getMemoryLimitController().currentUsage();
+ Assert.assertEquals(currentUsage, totalMemoryUsage);
+ }
+
+ try {
+ producer2.send(msgBytes);
+ Assert.fail("producer can not send message due to memory limit
exceeded");
+ } catch (PulsarClientException.MemoryBufferIsFullError ex) {
+ long currentUsage =
pulsarClient2.getMemoryLimitController().currentUsage();
+ Assert.assertEquals(currentUsage, totalMemoryUsage);
+ }
+
+ producer1.close();
+
Assert.assertEquals(pulsarClient1.getMemoryLimitController().currentUsage(),
producer2MemoryUsage);
+
+ for (int i = 0; i < halfMsgCount; i++) {
+ producer2.sendAsync(msgBytes);
+ }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
pulsarClient2.getMemoryLimitController().currentUsage() == totalMemoryUsage);
+
+ producer2.close();
+
Assert.assertEquals(pulsarClient1.getMemoryLimitController().currentUsage(), 0);
+ }
+
private void initClientWithMemoryLimit() throws PulsarClientException {
replacePulsarClient(PulsarClient.builder().
serviceUrl(lookupUrl.toString())
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index db330951f74..d72426e3d8d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -91,7 +91,7 @@ public class PulsarTestClient extends PulsarClientImpl {
private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
AtomicReference<Supplier<ClientCnx>>
clientCnxSupplierReference)
throws PulsarClientException {
- super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new
DnsResolverGroupImpl(conf));
+ super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new
DnsResolverGroupImpl(conf), null);
// workaround initialization order issue so that ClientCnx can be
created in this class
clientCnxSupplierReference.set(this::createClientCnx);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
new file mode 100644
index 00000000000..14a85ee305e
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MemoryLimitConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+/**
+ * Configuration interface for memory limit settings.
+ */
+public interface MemoryLimitConfig {
+
+ /**
+ * Configure a limit on the amount of direct memory that will be allocated
by this shared client instance.
+ * <p>
+ * See also {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
+ *
+ * @param memoryLimit the memory limit value, setting this to 0 will
disable the limit
+ * @param unit the memory limit size unit
+ * @return the memory limit configuration instance for chained calls
+ */
+ MemoryLimitConfig memoryLimit(long memoryLimit, SizeUnit unit);
+
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
new file mode 100644
index 00000000000..0b05f960041
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/OpenTelemetryConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+
+import io.opentelemetry.api.OpenTelemetry;
+
+/**
+ * Configuration interface for open telemetry settings.
+ */
+public interface OpenTelemetryConfig {
+
+ /**
+ * Configure OpenTelemetry for this shared client instance.
+ * <p>
+ * See also {@link ClientBuilder#openTelemetry(OpenTelemetry)}.
+ *
+ * @param openTelemetry the open telemetry instance
+ * @return the open telemetry configuration instance for chained calls
+ */
+ OpenTelemetryConfig openTelemetry(OpenTelemetry openTelemetry);
+
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
index 34a56ebd608..3b2db4e9233 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
@@ -71,7 +71,12 @@ public interface PulsarClientSharedResources extends
AutoCloseable {
// pulsar-lookup threadpool
LookupExecutor(SharedResourceType.ThreadPool),
// DNS resolver and cache that must be shared together with
eventLoopGroup
- DnsResolver(SharedResourceType.DnsResolver);
+ DnsResolver(SharedResourceType.DnsResolver),
+ // pulsar client global memory limit controller
+ MemoryLimitController(SharedResourceType.MemoryLimitController),
+ // pulsar client global open telemetry instance
+ OpenTelemetry(SharedResourceType.OpenTelemetry)
+ ;
private final SharedResourceType type;
@@ -88,7 +93,9 @@ public interface PulsarClientSharedResources extends
AutoCloseable {
EventLoopGroup,
ThreadPool,
Timer,
- DnsResolver;
+ DnsResolver,
+ MemoryLimitController,
+ OpenTelemetry;
}
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
index 3a94d5352b5..227f20da7ad 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
@@ -119,4 +119,20 @@ public interface PulsarClientSharedResourcesBuilder {
*/
PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig>
configurer);
+ /**
+ * Configures the memory limit settings.
+ *
+ * @param configurer a consumer that configures the memory limit settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer);
+
+ /**
+ * Configures the open telemetry settings.
+ *
+ * @param configurer a consumer that configures the open telemetry settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureOpenTelemetry(Consumer<OpenTelemetryConfig> configurer);
+
}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index 4f8a6fdb06a..489562278b2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -29,22 +30,24 @@ public class MemoryLimitController {
private final long memoryLimit;
private final long triggerThreshold;
- private final Runnable trigger;
+ private final CopyOnWriteArraySet<Runnable> triggers = new
CopyOnWriteArraySet<>();
private final AtomicLong currentUsage = new AtomicLong();
private final ReentrantLock mutex = new ReentrantLock(false);
private final Condition condition = mutex.newCondition();
private final AtomicBoolean triggerRunning = new AtomicBoolean(false);
public MemoryLimitController(long memoryLimitBytes) {
- this.memoryLimit = memoryLimitBytes;
- triggerThreshold = 0;
- trigger = null;
+ this(memoryLimitBytes, 0);
}
- public MemoryLimitController(long memoryLimitBytes, long triggerThreshold,
Runnable trigger) {
+ public MemoryLimitController(long memoryLimitBytes, long triggerThreshold)
{
this.memoryLimit = memoryLimitBytes;
this.triggerThreshold = triggerThreshold;
- this.trigger = trigger;
+ }
+
+ public MemoryLimitController(long memoryLimitBytes, long triggerThreshold,
Runnable trigger) {
+ this(memoryLimitBytes, triggerThreshold);
+ this.triggers.add(trigger);
}
public void forceReserveMemory(long size) {
@@ -88,10 +91,12 @@ public class MemoryLimitController {
}
private void checkTrigger(long prevUsage, long newUsage) {
- if (newUsage >= triggerThreshold && prevUsage < triggerThreshold &&
trigger != null) {
+ if (newUsage >= triggerThreshold && prevUsage < triggerThreshold &&
!triggers.isEmpty()) {
if (triggerRunning.compareAndSet(false, true)) {
try {
- trigger.run();
+ for (Runnable trigger : triggers) {
+ trigger.run();
+ }
} finally {
triggerRunning.set(false);
}
@@ -152,4 +157,12 @@ public class MemoryLimitController {
public long memoryLimit() {
return memoryLimit;
}
+
+ public void registerTrigger(Runnable trigger) {
+ triggers.add(trigger);
+ }
+
+ public void deregisterTrigger(Runnable trigger) {
+ triggers.remove(trigger);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c1034925ce1..020786ee578 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -106,7 +106,7 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log =
LoggerFactory.getLogger(PulsarClientImpl.class);
private static final int CLOSE_TIMEOUT_SECONDS = 60;
- private static final double
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
+ protected static final double
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;
// default limits for producers when memory limit controller is disabled
private static final int NO_MEMORY_LIMIT_DEFAULT_MAX_PENDING_MESSAGES =
1000;
@@ -171,23 +171,25 @@ public class PulsarClientImpl implements PulsarClient {
@Getter
private TransactionCoordinatorClientImpl tcClient;
+ private final Runnable memoryLimitTrigger =
this::reduceConsumerReceiverQueueSize;
+
public PulsarClientImpl(ClientConfigurationData conf) throws
PulsarClientException {
- this(conf, null, null, null, null, null, null, null, null);
+ this(conf, null, null, null, null, null, null, null, null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, null, null, null, null, null, null, null);
+ this(conf, eventLoopGroup, null, null, null, null, null, null, null,
null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, null, null, null, null, null,
null);
+ this(conf, eventLoopGroup, cnxPool, null, null, null, null, null,
null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
Timer timer)
throws PulsarClientException {
- this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null,
null);
+ this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null,
null, null);
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
@@ -196,7 +198,7 @@ public class PulsarClientImpl implements PulsarClient {
ScheduledExecutorProvider
scheduledExecutorProvider)
throws PulsarClientException {
this(conf, eventLoopGroup, connectionPool, timer,
externalExecutorProvider, internalExecutorProvider,
- scheduledExecutorProvider, null, null);
+ scheduledExecutorProvider, null, null, null);
}
@Builder(builderClassName = "PulsarClientImplBuilder")
@@ -205,8 +207,8 @@ public class PulsarClientImpl implements PulsarClient {
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider,
ExecutorProvider lookupExecutorProvider,
- DnsResolverGroupImpl dnsResolverGroup) throws
PulsarClientException {
-
+ DnsResolverGroupImpl dnsResolverGroup,
+ MemoryLimitController memoryLimitController) throws
PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
@@ -286,14 +288,19 @@ public class PulsarClientImpl implements PulsarClient {
}
}
- memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes(),
- (long) (conf.getMemoryLimitBytes() *
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
- this::reduceConsumerReceiverQueueSize);
- // Only create memory buffer metrics if memory limiting is enabled
- if (memoryLimitController.isMemoryLimited()) {
- memoryBufferStats = new MemoryBufferStats(instrumentProvider,
memoryLimitController);
+ if (memoryLimitController == null) {
+ this.memoryLimitController = new
MemoryLimitController(conf.getMemoryLimitBytes(),
+ (long) (conf.getMemoryLimitBytes() *
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
+ this.memoryLimitTrigger);
+ } else {
+ this.memoryLimitController = memoryLimitController;
+
this.memoryLimitController.registerTrigger(this.memoryLimitTrigger);
+ }
+ // Only create memory buffer metrics if memory limit controller is
local and memory limiting is enabled.
+ if (memoryLimitController == null &&
this.memoryLimitController.isMemoryLimited()) {
+ this.memoryBufferStats = new
MemoryBufferStats(this.instrumentProvider, this.memoryLimitController);
} else {
- memoryBufferStats = null;
+ this.memoryBufferStats = null;
}
state.set(State.Open);
} catch (Throwable t) {
@@ -1003,6 +1010,7 @@ public class PulsarClientImpl implements PulsarClient {
} catch (PulsarClientException e) {
throwable = e;
}
+
if (memoryBufferStats != null) {
try {
memoryBufferStats.close();
@@ -1011,6 +1019,11 @@ public class PulsarClientImpl implements PulsarClient {
throwable = t;
}
}
+
+ if (memoryLimitController != null) {
+ memoryLimitController.deregisterTrigger(memoryLimitTrigger);
+ }
+
if (conf != null && conf.getAuthentication() != null) {
try {
conf.getAuthentication().close();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
index 28f97e92159..391cb93dba7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.client.impl;
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -146,4 +149,24 @@ class PulsarClientResourcesConfigurer {
}
return new DnsResolverGroupImpl(resourceConfig);
}
+
+ static MemoryLimitController createMemoryLimitControllerWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.MemoryLimitResourceConfig
resourceConfig) {
+ if (resourceConfig == null) {
+ resourceConfig = new
PulsarClientSharedResourcesBuilderImpl.MemoryLimitResourceConfig();
+ }
+ long memoryLimit = resourceConfig.memoryLimit;
+ // Keep the same logic with PulsarClientImpl's memory limit settings.
+ long triggerThreshold =
+ (long) (memoryLimit *
PulsarClientImpl.THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING);
+ return new MemoryLimitController(memoryLimit, triggerThreshold);
+ }
+
+ static InstrumentProvider createInstrumentProviderWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.OpenTelemetryResourceConfig
resourceConfig) {
+ // Use global openTelemetry instance if not configured.
+ OpenTelemetry openTelemetry =
+ Optional.ofNullable(resourceConfig).map(config ->
config.openTelemetry).orElse(null);
+ return new InstrumentProvider(openTelemetry);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
index f8ac98bb862..e2dc5995167 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import com.google.common.collect.Lists;
+import io.opentelemetry.api.OpenTelemetry;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
@@ -30,8 +31,11 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.DnsResolverConfig;
import org.apache.pulsar.client.api.EventLoopGroupConfig;
+import org.apache.pulsar.client.api.MemoryLimitConfig;
+import org.apache.pulsar.client.api.OpenTelemetryConfig;
import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.ThreadPoolConfig;
import org.apache.pulsar.client.api.TimerConfig;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -202,6 +206,34 @@ public class PulsarClientSharedResourcesBuilderImpl
implements PulsarClientShare
}
}
+ static class MemoryLimitResourceConfig implements ResourceConfig,
MemoryLimitConfig {
+ long memoryLimit;
+
+ public MemoryLimitResourceConfig() {
+ }
+
+ @Override
+ public MemoryLimitConfig memoryLimit(long memoryLimit, SizeUnit unit) {
+ this.memoryLimit = unit.toBytes(memoryLimit);
+ return this;
+ }
+
+ }
+
+ static class OpenTelemetryResourceConfig implements ResourceConfig,
OpenTelemetryConfig {
+ OpenTelemetry openTelemetry;
+
+ public OpenTelemetryResourceConfig() {
+ }
+
+ @Override
+ public OpenTelemetryConfig openTelemetry(OpenTelemetry openTelemetry) {
+ this.openTelemetry = openTelemetry;
+ return this;
+ }
+
+ }
+
@Override
public PulsarClientSharedResourcesBuilder resourceTypes(
PulsarClientSharedResources.SharedResource... sharedResource) {
@@ -242,6 +274,10 @@ public class PulsarClientSharedResourcesBuilderImpl
implements PulsarClientShare
return new ThreadPoolResourceConfig();
case Timer:
return new TimerResourceConfig();
+ case MemoryLimitController:
+ return new MemoryLimitResourceConfig();
+ case OpenTelemetry:
+ return new OpenTelemetryResourceConfig();
default:
throw new IllegalArgumentException("Unknown resource type:
" + sharedResource.getType());
}
@@ -277,6 +313,18 @@ public class PulsarClientSharedResourcesBuilderImpl
implements PulsarClientShare
return this;
}
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.MemoryLimitController));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureOpenTelemetry(Consumer<OpenTelemetryConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.OpenTelemetry));
+ return this;
+ }
+
@Override
public PulsarClientSharedResources build() {
return new PulsarClientSharedResourcesImpl(sharedResources,
resourceConfigs, shareConfigured);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
index 29953784681..d6eb459e23e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createDnsResolverGroupWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createEventLoopGroupWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createExternalExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInstrumentProviderWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInternalExecutorProviderWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createLookupExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createMemoryLimitControllerWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createScheduledExecutorProviderWithResourceConfig;
import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createTimer;
import io.netty.channel.EventLoopGroup;
@@ -31,12 +33,16 @@ import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.MemoryBufferStats;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+@Slf4j
@Getter
public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResources {
Set<SharedResource> sharedResources;
@@ -47,6 +53,9 @@ public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResour
private final Timer timer;
private final ExecutorProvider lookupExecutorProvider;
private final DnsResolverGroupImpl dnsResolverGroup;
+ private final MemoryLimitController memoryLimitController;
+ private final InstrumentProvider instrumentProvider;
+ private final MemoryBufferStats memoryBufferStats;
public PulsarClientSharedResourcesImpl(Set<SharedResource> sharedResources,
Map<SharedResource,
PulsarClientSharedResourcesBuilderImpl.ResourceConfig>
@@ -91,6 +100,21 @@ public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResour
? createDnsResolverGroupWithResourceConfig(
getResourceConfig(resourceConfigs, SharedResource.DnsResolver))
: null;
+ this.memoryLimitController =
this.sharedResources.contains(SharedResource.MemoryLimitController)
+ ? createMemoryLimitControllerWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.MemoryLimitController))
+ : null;
+ this.instrumentProvider =
this.sharedResources.contains(SharedResource.OpenTelemetry)
+ ? createInstrumentProviderWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.OpenTelemetry))
+ : null;
+ // Only create memory buffer metrics if memory limiting is enabled.
+ if (this.memoryLimitController != null &&
this.memoryLimitController.isMemoryLimited()
+ && this.instrumentProvider != null) {
+ this.memoryBufferStats = new
MemoryBufferStats(this.instrumentProvider, this.memoryLimitController);
+ } else {
+ this.memoryBufferStats = null;
+ }
}
@SuppressWarnings("unchecked")
@@ -132,6 +156,13 @@ public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResour
if (ioEventLoopGroup != null) {
EventLoopUtil.shutdownGracefully(ioEventLoopGroup);
}
+ if (memoryBufferStats != null) {
+ try {
+ memoryBufferStats.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close shared memoryBufferStats", t);
+ }
+ }
}
public void applyTo(PulsarClientImpl.PulsarClientImplBuilder
instanceBuilder) {
@@ -156,5 +187,8 @@ public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResour
if (ioEventLoopGroup != null) {
instanceBuilder.eventLoopGroup(ioEventLoopGroup);
}
+ if (memoryLimitController != null) {
+ instanceBuilder.memoryLimitController(memoryLimitController);
+ }
}
}