Copilot commented on code in PR #25477:
URL: https://github.com/apache/pulsar/pull/25477#discussion_r3044160452
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java:
##########
@@ -119,4 +120,20 @@ PulsarClientSharedResourcesBuilder
configureThreadPool(SharedResource sharedReso
*/
PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig>
configurer);
+ /**
+ * Configures the memory limit settings.
+ *
+ * @param configurer a consumer that configures the memory limit settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer);
+
+ /**
+ * Configures the open telemetry settings.
+ *
+ * @param configurer a consumer that configures the open telemetry settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureOpenTelemetry(Consumer<OpenTelemetry> configurer);
Review Comment:
`configureOpenTelemetry` takes `Consumer<OpenTelemetry>`, but the
implementation/config model added in this PR is `OpenTelemetryConfig` (and the
builder impl stores an `OpenTelemetry` inside a config object). As written,
this API type makes it impossible to configure via the config interface and
also forces the impl into an incompatible call site. Change the signature to
accept a config type (e.g., `Consumer<OpenTelemetryConfig>`) to match
`configureMemoryLimitController` and the new `OpenTelemetryConfig` interface.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java:
##########
@@ -152,4 +157,12 @@ public boolean isMemoryLimited() {
public long memoryLimit() {
return memoryLimit;
}
+
+ public void registerTrigger(Runnable trigger) {
+ triggers.add(trigger);
Review Comment:
When sharing a controller, a client can register its trigger while
`currentUsage` is already >= `triggerThreshold`. With the current
edge-triggered design (only fires on threshold crossing), newly registered
triggers may never run until usage drops below the threshold and crosses again,
so late-joining clients may not shrink receiver queues under existing memory
pressure. Consider invoking the newly registered trigger immediately (guarded
by `triggerRunning`) when `currentUsage.get() >= triggerThreshold`, or
otherwise ensuring registration can prompt a shrink when already above
threshold.
```suggestion
triggers.add(trigger);
if (currentUsage.get() >= triggerThreshold
&& triggerRunning.compareAndSet(false, true)) {
try {
trigger.run();
} finally {
triggerRunning.set(false);
}
}
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java:
##########
@@ -29,22 +30,24 @@ public class MemoryLimitController {
private final long memoryLimit;
private final long triggerThreshold;
- private final Runnable trigger;
+ private final CopyOnWriteArraySet<Runnable> triggers = new
CopyOnWriteArraySet<>();
private final AtomicLong currentUsage = new AtomicLong();
private final ReentrantLock mutex = new ReentrantLock(false);
private final Condition condition = mutex.newCondition();
private final AtomicBoolean triggerRunning = new AtomicBoolean(false);
public MemoryLimitController(long memoryLimitBytes) {
- this.memoryLimit = memoryLimitBytes;
- triggerThreshold = 0;
- trigger = null;
+ this(memoryLimitBytes, 0);
}
- public MemoryLimitController(long memoryLimitBytes, long triggerThreshold,
Runnable trigger) {
+ public MemoryLimitController(long memoryLimitBytes, long triggerThreshold)
{
this.memoryLimit = memoryLimitBytes;
this.triggerThreshold = triggerThreshold;
- this.trigger = trigger;
+ }
+
+ public MemoryLimitController(long memoryLimitBytes, long triggerThreshold,
Runnable trigger) {
+ this(memoryLimitBytes, triggerThreshold);
+ this.triggers.add(trigger);
}
Review Comment:
`trigger` can be null here and would be added to `triggers`, which will
later cause a guaranteed `NullPointerException` in the `trigger.run()` loop.
Additionally, a single trigger throwing an exception will prevent subsequent
triggers from running and may propagate back into memory reservation paths.
Recommend: (1) reject null triggers in the constructor and in
`registerTrigger`, and (2) execute triggers defensively (catch `Throwable`
per-trigger so one bad trigger can't break the controller's behavior for other
clients).
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java:
##########
@@ -71,7 +71,12 @@ enum SharedResource {
// pulsar-lookup threadpool
LookupExecutor(SharedResourceType.ThreadPool),
// DNS resolver and cache that must be shared together with
eventLoopGroup
- DnsResolver(SharedResourceType.DnsResolver);
+ DnsResolver(SharedResourceType.DnsResolver),
+ // pulsar client global memory limit controller
+ MemoryLimitController(SharedResourceType.MemoryLimitController),
+ // pulsar client global open telemetry instance
+ OpenTelemetry(SharedResourceType.OpenTelemetry)
Review Comment:
The PR description focuses on a shared `MemoryLimitController`, but the
public API is also expanded to include a new shared resource type for
`OpenTelemetry` plus related builder/config APIs. Since the PR marks 'public
API' impact, the description should explicitly mention the added shared
OpenTelemetry resource/config surface (or clarify that it's only used to enable
metrics for the shared memory controller).
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java:
##########
@@ -277,6 +313,18 @@ public PulsarClientSharedResourcesBuilder
configureTimer(Consumer<TimerConfig> c
return this;
}
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.MemoryLimitController));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureOpenTelemetry(Consumer<OpenTelemetry> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.OpenTelemetry));
+ return this;
+ }
Review Comment:
This does not type-check: `getOrCreateConfig(...)` returns an
`OpenTelemetryResourceConfig` (implements `OpenTelemetryConfig`), but
`configurer` expects `OpenTelemetry`. This is a compilation-breaking mismatch.
Update this method (and the public interface) to accept
`Consumer<OpenTelemetryConfig>` (or compatible generic bounds) and pass the
`OpenTelemetryConfig` implementation to the consumer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]