ashwindakappagari commented on issue #25936: URL: https://github.com/apache/pulsar/issues/25936#issuecomment-4626330592
> Thanks [@lhotari](https://github.com/lhotari) — agree on the one-liner approach. One question: the residual gap where producer creation succeeds slightly past operationTimeoutMs is exactly the bug class we're fixing, just narrower. In environments where broker pod replacement DNS recovery can exceed 30s (we observed 12s, but tail latencies during AZ-spanning rollouts have been seen higher), the orphan can still form. Would you be open to additionally applying a whenComplete-style cleanup on the residual future? Diff is +5 lines on top of your one-liner, eliminates the gap entirely. Otherwise, would you recommend tuning operationTimeoutMs upward as the documented mitigation for environments at risk? Hope thats a fair Q. Appreciate your quick response. Hi @lhotari . This is what I have in mind. ``` public static Producer<byte[]> createExclusiveProducerWithRetry(PulsarClient client, String topic, String producerName, Supplier<Boolean> isLeader, int sleepInBetweenMs) throws NotLeaderAnymore { try { int tries = 0; do { CompletableFuture<Producer<byte[]>> producerFuture = null; try { producerFuture = client.newProducer().topic(topic) .accessMode(ProducerAccessMode.Exclusive) .enableBatching(false) .blockIfQueueFull(true) .compressionType(CompressionType.LZ4) .producerName(producerName) .createAsync(); return producerFuture.get(10, TimeUnit.SECONDS); } catch (Exception e) { log.info("Encountered exception while at creating exclusive producer to topic {}", topic, e); if (producerFuture != null) { producerFuture.whenComplete((p, err) -> { if (p != null) { try { p.closeAsync(); } catch (Exception ignored) { } } }); } } tries++; if (tries % 6 == 0) { if (log.isDebugEnabled()) { log.debug("Failed to acquire exclusive producer ... after {} attempts.", topic, tries); } } Thread.sleep(sleepInBetweenMs); } while (isLeader.get()); } catch (InterruptedException e) { throw new RuntimeException("Failed to create exclusive producer on topic " + topic, e); } throw new NotLeaderAnymore(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
