This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cf2faefd3ba [cleanup][client] Remove unused RetryUtil class (#25934)
cf2faefd3ba is described below
commit cf2faefd3ba7b1700921e4e104d02a057d316130
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jun 4 22:01:27 2026 +0300
[cleanup][client] Remove unused RetryUtil class (#25934)
---
.../apache/pulsar/client/impl/RetryUtilTest.java | 84 ----------------------
.../org/apache/pulsar/client/util/RetryUtil.java | 67 -----------------
2 files changed, 151 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
deleted file mode 100644
index 555c0ca0c70..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Cleanup;
-import org.apache.pulsar.client.util.RetryUtil;
-import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.testng.annotations.Test;
-
-@Test(groups = "utils")
-public class RetryUtilTest {
-
-
- @Test
- public void testFailAndRetry() throws Exception {
- @Cleanup("shutdownNow")
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
- CompletableFuture<Boolean> callback = new CompletableFuture<>();
- AtomicInteger atomicInteger = new AtomicInteger(0);
- Backoff backoff = Backoff.builder()
- .initialDelay(Duration.ofMillis(100))
- .maxBackoff(Duration.ofMillis(2000))
- .mandatoryStop(Duration.ofMillis(5000))
- .build();
- RetryUtil.retryAsynchronously(() -> {
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- atomicInteger.incrementAndGet();
- if (atomicInteger.get() < 5) {
- future.completeExceptionally(new RuntimeException("fail"));
- } else {
- future.complete(true);
- }
- return future;
- }, backoff, executor, callback);
- assertTrue(callback.get());
- assertEquals(atomicInteger.get(), 5);
- }
-
- @Test
- public void testFail() throws Exception {
- @Cleanup("shutdownNow")
- ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
- CompletableFuture<Boolean> callback = new CompletableFuture<>();
- Backoff backoff = Backoff.builder()
- .initialDelay(Duration.ofMillis(500))
- .maxBackoff(Duration.ofMillis(2000))
- .mandatoryStop(Duration.ofMillis(5000))
- .build();
- long start = System.currentTimeMillis();
- RetryUtil.retryAsynchronously(() ->
- FutureUtil.failedFuture(new RuntimeException("fail")),
backoff, executor, callback);
- try {
- callback.get();
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("fail"));
- }
- long time = System.currentTimeMillis() - start;
- assertTrue(time >= 5000 - 2000, "Duration:" + time);
- }
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
deleted file mode 100644
index 0ba61c1d854..00000000000
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.util;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import lombok.CustomLog;
-import org.apache.pulsar.common.util.Backoff;
-
-@CustomLog
-public class RetryUtil {
-
- public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>>
supplier, Backoff backoff,
- ScheduledExecutorService
scheduledExecutorService,
- CompletableFuture<T> callback) {
- if (backoff.getMax().isZero() || backoff.getMax().isNegative()) {
- throw new IllegalArgumentException("Illegal max retry time");
- }
- if (backoff.getInitial().isZero() ||
backoff.getInitial().isNegative()) {
- throw new IllegalArgumentException("Illegal initial time");
- }
- scheduledExecutorService.execute(() ->
- executeWithRetry(supplier, backoff, scheduledExecutorService,
callback));
- }
-
- private static <T> void executeWithRetry(Supplier<CompletableFuture<T>>
supplier, Backoff backoff,
- ScheduledExecutorService
scheduledExecutorService,
- CompletableFuture<T> callback) {
- supplier.get().whenComplete((result, e) -> {
- if (e != null) {
- long next = backoff.next().toMillis();
- boolean isMandatoryStop = backoff.isMandatoryStopMade();
- if (isMandatoryStop) {
- callback.completeExceptionally(e);
- } else {
- log.warn().exceptionMessage(e)
- .attr("nextMs", next)
- .log("Execution with retry failed, will retry");
- scheduledExecutorService.schedule(() ->
- executeWithRetry(supplier, backoff,
scheduledExecutorService, callback),
- next, TimeUnit.MILLISECONDS);
- }
- return;
- }
- callback.complete(result);
- });
- }
-
-}