ashwindakappagari commented on issue #25936:
URL: https://github.com/apache/pulsar/issues/25936#issuecomment-4626330592

   > Thanks [@lhotari](https://github.com/lhotari) — agree on the one-liner 
approach. One question: the residual gap where producer creation succeeds 
slightly past operationTimeoutMs is exactly the bug class we're fixing, just 
narrower. In environments where broker pod replacement DNS recovery can exceed 
30s (we observed 12s, but tail latencies during AZ-spanning rollouts have been 
seen higher), the orphan can still form. Would you be open to additionally 
applying a whenComplete-style cleanup on the residual future? Diff is +5 lines 
on top of your one-liner, eliminates the gap entirely. Otherwise, would you 
recommend tuning operationTimeoutMs upward as the documented mitigation for 
environments at risk? Hope thats a fair Q. Appreciate your quick response.
   
   Hi @lhotari . This is what I have in mind. 
    ```
   public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient 
client,
                                                                     String 
topic,
                                                                     String 
producerName,
                                                                     
Supplier<Boolean> isLeader,
                                                                     int 
sleepInBetweenMs) throws NotLeaderAnymore {
         try {
             int tries = 0;
             do {
                 CompletableFuture<Producer<byte[]>> producerFuture = null;
                 try {
                     producerFuture = client.newProducer().topic(topic)
                             .accessMode(ProducerAccessMode.Exclusive)
                             .enableBatching(false)
                             .blockIfQueueFull(true)
                             .compressionType(CompressionType.LZ4)
                             .producerName(producerName)
                             .createAsync();
                     return producerFuture.get(10, TimeUnit.SECONDS);
                 } catch (Exception e) {
                     log.info("Encountered exception while at creating 
exclusive producer to topic {}",
                              topic, e);
                     if (producerFuture != null) {
                         producerFuture.whenComplete((p, err) -> {
                             if (p != null) {
                                 try { p.closeAsync(); } catch (Exception 
ignored) { }
                             }
                         });
                     }
                 }
                 tries++;
                 if (tries % 6 == 0) {
                     if (log.isDebugEnabled()) {
                         log.debug("Failed to acquire exclusive producer ... 
after {} attempts.", topic, tries);
                     }
                 }
                 Thread.sleep(sleepInBetweenMs);
             } while (isLeader.get());
         } catch (InterruptedException e) {
             throw new RuntimeException("Failed to create exclusive producer on 
topic " + topic, e);
         }
         throw new NotLeaderAnymore();
     }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to