Copilot commented on code in PR #25477:
URL: https://github.com/apache/pulsar/pull/25477#discussion_r3044160452


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java:
##########
@@ -119,4 +120,20 @@ PulsarClientSharedResourcesBuilder 
configureThreadPool(SharedResource sharedReso
      */
     PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig> 
configurer);
 
+    /**
+     * Configures the memory limit settings.
+     *
+     * @param configurer a consumer that configures the memory limit settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer);
+
+    /**
+     * Configures the open telemetry settings.
+     *
+     * @param configurer a consumer that configures the open telemetry settings
+     * @return this builder instance for method chaining
+     */
+    PulsarClientSharedResourcesBuilder 
configureOpenTelemetry(Consumer<OpenTelemetry> configurer);

Review Comment:
   `configureOpenTelemetry` takes `Consumer<OpenTelemetry>`, but the 
implementation/config model added in this PR is `OpenTelemetryConfig` (and the 
builder impl stores an `OpenTelemetry` inside a config object). As written, 
this API type makes it impossible to configure via the config interface and 
also forces the impl into an incompatible call site. Change the signature to 
accept a config type (e.g., `Consumer<OpenTelemetryConfig>`) to match 
`configureMemoryLimitController` and the new `OpenTelemetryConfig` interface.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java:
##########
@@ -152,4 +157,12 @@ public boolean isMemoryLimited() {
     public long memoryLimit() {
         return memoryLimit;
     }
+
+    public void registerTrigger(Runnable trigger) {
+        triggers.add(trigger);

Review Comment:
   When sharing a controller, a client can register its trigger while 
`currentUsage` is already >= `triggerThreshold`. With the current 
edge-triggered design (only fires on threshold crossing), newly registered 
triggers may never run until usage drops below the threshold and crosses again, 
so late-joining clients may not shrink receiver queues under existing memory 
pressure. Consider invoking the newly registered trigger immediately (guarded 
by `triggerRunning`) when `currentUsage.get() >= triggerThreshold`, or 
otherwise ensuring registration can prompt a shrink when already above 
threshold.
   ```suggestion
           triggers.add(trigger);
           if (currentUsage.get() >= triggerThreshold
                   && triggerRunning.compareAndSet(false, true)) {
               try {
                   trigger.run();
               } finally {
                   triggerRunning.set(false);
               }
           }
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java:
##########
@@ -29,22 +30,24 @@ public class MemoryLimitController {
 
     private final long memoryLimit;
     private final long triggerThreshold;
-    private final Runnable trigger;
+    private final CopyOnWriteArraySet<Runnable> triggers = new 
CopyOnWriteArraySet<>();
     private final AtomicLong currentUsage = new AtomicLong();
     private final ReentrantLock mutex = new ReentrantLock(false);
     private final Condition condition = mutex.newCondition();
     private final AtomicBoolean triggerRunning = new AtomicBoolean(false);
 
     public MemoryLimitController(long memoryLimitBytes) {
-        this.memoryLimit = memoryLimitBytes;
-        triggerThreshold = 0;
-        trigger = null;
+        this(memoryLimitBytes, 0);
     }
 
-    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, 
Runnable trigger) {
+    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold) 
{
         this.memoryLimit = memoryLimitBytes;
         this.triggerThreshold = triggerThreshold;
-        this.trigger = trigger;
+    }
+
+    public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, 
Runnable trigger) {
+        this(memoryLimitBytes, triggerThreshold);
+        this.triggers.add(trigger);
     }

Review Comment:
   `trigger` can be null here and would be added to `triggers`, which will 
later cause a guaranteed `NullPointerException` in the `trigger.run()` loop. 
Additionally, a single trigger throwing an exception will prevent subsequent 
triggers from running and may propagate back into memory reservation paths. 
Recommend: (1) reject null triggers in the constructor and in 
`registerTrigger`, and (2) execute triggers defensively (catch `Throwable` 
per-trigger so one bad trigger can't break the controller's behavior for other 
clients).



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java:
##########
@@ -71,7 +71,12 @@ enum SharedResource {
         // pulsar-lookup threadpool
         LookupExecutor(SharedResourceType.ThreadPool),
         // DNS resolver and cache that must be shared together with 
eventLoopGroup
-        DnsResolver(SharedResourceType.DnsResolver);
+        DnsResolver(SharedResourceType.DnsResolver),
+        // pulsar client global memory limit controller
+        MemoryLimitController(SharedResourceType.MemoryLimitController),
+        // pulsar client global open telemetry instance
+        OpenTelemetry(SharedResourceType.OpenTelemetry)

Review Comment:
   The PR description focuses on a shared `MemoryLimitController`, but the 
public API is also expanded to include a new shared resource type for 
`OpenTelemetry` plus related builder/config APIs. Since the PR marks 'public 
API' impact, the description should explicitly mention the added shared 
OpenTelemetry resource/config surface (or clarify that it's only used to enable 
metrics for the shared memory controller).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java:
##########
@@ -277,6 +313,18 @@ public PulsarClientSharedResourcesBuilder 
configureTimer(Consumer<TimerConfig> c
         return this;
     }
 
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureMemoryLimitController(Consumer<MemoryLimitConfig> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.MemoryLimitController));
+        return this;
+    }
+
+    @Override
+    public PulsarClientSharedResourcesBuilder 
configureOpenTelemetry(Consumer<OpenTelemetry> configurer) {
+        
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.OpenTelemetry));
+        return this;
+    }

Review Comment:
   This does not type-check: `getOrCreateConfig(...)` returns an 
`OpenTelemetryResourceConfig` (implements `OpenTelemetryConfig`), but 
`configurer` expects `OpenTelemetry`. This is a compilation-breaking mismatch. 
Update this method (and the public interface) to accept 
`Consumer<OpenTelemetryConfig>` (or compatible generic bounds) and pass the 
`OpenTelemetryConfig` implementation to the consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to