This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0a717fb5765039a9e5456d682255b9edb1134021 Author: Hao Zhang <[email protected]> AuthorDate: Tue Mar 10 01:04:10 2026 +0800 [fix][broker] Return failed future instead of throwing exception in async methods (#25289) Co-authored-by: 张浩 <[email protected]> (cherry picked from commit 35dae97c0bf5e3810d92ba12922e98a85d9634ec) --- .../authorization/PulsarAuthorizationProvider.java | 2 +- .../pulsar/broker/admin/impl/BrokersBase.java | 4 +-- .../pulsar/broker/admin/impl/NamespacesBase.java | 5 +-- .../broker/admin/impl/PersistentTopicsBase.java | 37 ++++++++++++---------- .../pulsar/broker/delayed/bucket/Bucket.java | 5 +-- .../channel/ServiceUnitStateChannelImpl.java | 2 +- .../pulsar/broker/namespace/NamespaceService.java | 16 +++++++--- .../pulsar/broker/service/BrokerService.java | 8 +++-- .../SystemTopicBasedTopicPoliciesService.java | 4 ++- .../org/apache/pulsar/broker/service/Topic.java | 4 ++- .../service/nonpersistent/NonPersistentTopic.java | 3 +- .../prometheus/PrometheusMetricsGenerator.java | 3 +- .../pulsar/broker/systopic/SystemTopicClient.java | 3 +- .../pulsar/broker/web/PulsarWebResource.java | 6 +++- .../compaction/PulsarCompactionServiceFactory.java | 10 ++++-- .../compaction/StrategicTwoPhaseCompactor.java | 2 +- .../client/impl/PulsarChannelInitializer.java | 12 ++++--- .../org/apache/pulsar/common/util/FutureUtil.java | 21 ++++++++---- .../pulsar/common/util/netty/ChannelFutures.java | 6 ++-- .../pulsar/common/util/netty/NettyFutureUtil.java | 10 ++++-- .../common/util/netty/ChannelFuturesTest.java | 12 +++++-- 21 files changed, 116 insertions(+), 59 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 976e7b7ee12..e4f2ac7e8cd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -326,7 +326,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) { boolean sameNamespace = namespaces.distinct().count() == 1; if (!sameNamespace) { - throw new IllegalArgumentException("The namespace should be the same"); + return FutureUtil.failedFuture(new IllegalArgumentException("The namespace should be the same")); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 1e4e4ff66dd..3ee2a1285d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -420,7 +420,8 @@ public class BrokersBase extends AdminResource { private CompletableFuture<Void> internalDeleteDynamicConfigurationOnMetadataAsync(String configName) { if (!pulsar().getBrokerService().isDynamicConfiguration(configName)) { - throw new RestException(Status.PRECONDITION_FAILED, "Can't delete non-dynamic configuration"); + return FutureUtil.failedFuture( + new RestException(Status.PRECONDITION_FAILED, "Can't delete non-dynamic configuration")); } else { return dynamicConfigurationResources().setDynamicConfigurationAsync(old -> { if (old != null) { @@ -536,4 +537,3 @@ public class BrokersBase extends AdminResource { return CompletableFuture.completedFuture(null); } } - diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b9ec275f79c..bb4cac85087 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -748,7 +748,8 @@ public abstract class NamespacesBase extends AdminResource { private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) { boolean sameNamespace = namespaces.distinct().count() == 1; if (!sameNamespace) { - throw new RestException(Status.BAD_REQUEST, "The namespace should be the same"); + return FutureUtil.failedFuture( + new RestException(Status.BAD_REQUEST, "The namespace should be the same")); } return CompletableFuture.completedFuture(null); } @@ -1971,7 +1972,7 @@ public abstract class NamespacesBase extends AdminResource { + " Repl clusters: %s, allowed clusters: %s", ns.toString(), policies.replication_clusters, policies.allowed_clusters); log.info(msg); - throw new RestException(Status.BAD_REQUEST, msg); + return FutureUtil.failedFuture(new RestException(Status.BAD_REQUEST, msg)); } pulsar().getBrokerService().setCurrentClusterAllowedIfNoClusterIsAllowed(ns, policies); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0a683f4b872..3e625a8d5ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1070,8 +1070,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNumToSet, boolean isGlobal) { if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "maxUnackedNum must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "maxUnackedNum must be 0 or more")); } return pulsar().getTopicPoliciesService() @@ -1095,8 +1095,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNumToSet, boolean isGlobal) { if (maxUnackedNumToSet != null && maxUnackedNumToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "maxUnackedNum must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "maxUnackedNum must be 0 or more")); } return pulsar().getTopicPoliciesService() @@ -1108,7 +1108,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer intervalToSet, boolean isGlobal) { if (intervalToSet != null && intervalToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "interval must be 0 or more")); } return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, isGlobal, intervalToSet == null, policies -> { @@ -3616,9 +3617,9 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSizeToSet, boolean isGlobal) { if (maxMessageSizeToSet != null && (maxMessageSizeToSet < 0 || maxMessageSizeToSet > config().getMaxMessageSize())) { - throw new RestException(Status.PRECONDITION_FAILED - , "topic-level maxMessageSize must be greater than or equal to 0 " - + "and must be smaller than that in the broker-level"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "topic-level maxMessageSize must be greater than or equal to 0 " + + "and must be smaller than that in the broker-level")); } return pulsar().getTopicPoliciesService() @@ -3646,8 +3647,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducersToSet, boolean isGlobal) { if (maxProducersToSet != null && maxProducersToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "maxProducers must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "maxProducers must be 0 or more")); } return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, isGlobal, maxProducersToSet == null, policies -> { @@ -3663,8 +3664,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsToSet, boolean isGlobal) { if (maxSubscriptionsToSet != null && maxSubscriptionsToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "maxSubscriptionsPerTopic must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "maxSubscriptionsPerTopic must be 0 or more")); } return pulsar().getTopicPoliciesService() @@ -3748,8 +3749,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumersToSet, boolean isGlobal) { if (maxConsumersToSet != null && maxConsumersToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, - "maxConsumers must be 0 or more"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "maxConsumers must be 0 or more")); } return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, isGlobal, maxConsumersToSet == null, policies -> { @@ -4650,7 +4651,7 @@ public class PersistentTopicsBase extends AdminResource { futures.add(pulsar().getAdminClient().topics().trimTopicAsync(topicNamePartition.toString())); } catch (Exception e) { log.error("[{}] Failed to trim topic {}", clientAppId(), topicNamePartition, e); - throw new RestException(e); + return FutureUtil.failedFuture(new RestException(e)); } } return FutureUtil.waitForAll(futures).thenAccept(asyncResponse::resume); @@ -4764,7 +4765,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription( Integer maxConsumersPerSubscriptionToSet, boolean isGlobal) { if (maxConsumersPerSubscriptionToSet != null && maxConsumersPerSubscriptionToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Invalid value for maxConsumersPerSubscription")); } return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { policies.setMaxConsumersPerSubscription(maxConsumersPerSubscriptionToSet); @@ -4793,7 +4795,8 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThresholdToSet, boolean isGlobal) { if (compactionThresholdToSet != null && compactionThresholdToSet < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Invalid value for compactionThreshold")); } return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index a1693b1553d..776f99b120c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -23,7 +23,6 @@ import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTrack import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import lombok.AllArgsConstructor; @@ -158,7 +157,9 @@ abstract class Bucket { } private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long bucketId) { - Objects.requireNonNull(bucketId); + if (bucketId == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected bucketId should not be null")); + } return sequencer.sequential(() -> { return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)), ManagedLedgerException.BadVersionException.class, MaxRetryTimes); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 8b48e4b4516..a0e9f1f73e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -671,7 +671,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private CompletableFuture<Void> publishOverrideEventAsync(String serviceUnit, ServiceUnitStateData override) { if (!validateChannelState(Started, true)) { - throw new IllegalStateException("Invalid channel state:" + channelState.name()); + return FutureUtil.failedFuture(new IllegalStateException("Invalid channel state:" + channelState.name())); } EventType eventType = EventType.Override; eventCounters.get(eventType).getTotal().incrementAndGet(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index b5dbddc2ea3..f17526416e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1143,8 +1143,12 @@ public class NamespaceService implements AutoCloseable { */ public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname, NamespaceBundles nsBundles) { - Objects.requireNonNull(nsname); - Objects.requireNonNull(nsBundles); + if (nsname == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected NamespaceName should not be null")); + } + if (nsBundles == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected NamespaceBundles should not be null")); + } return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> { if (policies.isPresent()) { @@ -1170,8 +1174,12 @@ public class NamespaceService implements AutoCloseable { * @param nsBundles the new namespace bundles */ public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) { - Objects.requireNonNull(nsname); - Objects.requireNonNull(nsBundles); + if (nsname == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected NamespaceName should not be null")); + } + if (nsBundles == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected NamespaceBundles should not be null")); + } LocalPolicies localPolicies = nsBundles.toLocalPolicies(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7e9819f7f04..27008e96b37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1983,7 +1983,9 @@ public class BrokerService implements Closeable { } public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull TopicName topicName) { - requireNonNull(topicName); + if (topicName == null) { + return FutureUtil.failedFuture(new NullPointerException("topicName")); + } NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); @@ -3783,7 +3785,9 @@ public class BrokerService implements Closeable { } public @NonNull CompletableFuture<Boolean> isAllowAutoSubscriptionCreationAsync(@NonNull TopicName tpName) { - requireNonNull(tpName); + if (tpName == null) { + return FutureUtil.failedFuture(new NullPointerException("tpName")); + } // Policies priority: topic level -> namespace level -> broker level if (ExtensibleLoadManagerImpl.isInternalTopic(tpName.toString())) { return CompletableFuture.completedFuture(true); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index c3d88b9c723..ad18af308e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -565,7 +565,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic @VisibleForTesting @NonNull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@NonNull NamespaceName namespace) { - requireNonNull(namespace); + if (namespace == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected NamespaceName should not be null")); + } if (closed.get()) { return CompletableFuture.completedFuture(false); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 8f66d9c0e3e..fd3ffb1a34c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.utils.StatsOutputStream; @@ -291,7 +292,8 @@ public interface Topic { * Get the last message position that can be dispatch. */ default CompletableFuture<Position> getLastDispatchablePosition() { - throw new UnsupportedOperationException("getLastDispatchablePosition is not supported by default"); + return FutureUtil.failedFuture( + new UnsupportedOperationException("getLastDispatchablePosition is not supported by default")); } CompletableFuture<MessageId> getLastMessageId(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 1931a09497e..e79c642e7b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1219,7 +1219,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol @Override public CompletableFuture<MessageId> getLastMessageId() { - throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic"); + return FutureUtil.failedFuture( + new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic")); } private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 97d5a7bc953..2bb6372c1cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -60,6 +60,7 @@ import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass; import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SimpleTextOutputStream; /** @@ -139,7 +140,7 @@ public class PrometheusMetricsGenerator implements AutoCloseable { public synchronized CompletableFuture<ByteBuf> getCompressedBuffer(Executor executor) { if (released) { - throw new IllegalStateException("Already released!"); + return FutureUtil.failedFuture(new IllegalStateException("Already released!")); } if (compressedBuffer == null) { compressedBuffer = new CompletableFuture<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java index 88ca099b4ca..20521780bff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; /** * Pulsar system topic. @@ -123,7 +124,7 @@ public interface SystemTopicClient<T> { * @return message id future */ default CompletableFuture<MessageId> deleteAsync(String key, T t) { - throw new UnsupportedOperationException("Unsupported operation"); + return FutureUtil.failedFuture(new UnsupportedOperationException("Unsupported operation")); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 65489eaa34b..9291097d271 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -213,7 +213,11 @@ public abstract class PulsarWebResource { isClientAuthenticated(appId), appId); } String originalPrincipal = originalPrincipal(); - validateOriginalPrincipal(appId, originalPrincipal); + try { + validateOriginalPrincipal(appId, originalPrincipal); + } catch (RestException e) { + return FutureUtil.failedFuture(e); + } if (pulsar.getConfiguration().getProxyRoles().contains(appId)) { BrokerService brokerService = pulsar.getBrokerService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java index ff0fa0bcd12..b44480378c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java @@ -19,13 +19,13 @@ package org.apache.pulsar.compaction; import com.google.common.annotations.VisibleForTesting; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import lombok.AccessLevel; import lombok.Getter; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.util.FutureUtil; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -61,14 +61,18 @@ public class PulsarCompactionServiceFactory implements CompactionServiceFactory @Override public CompletableFuture<Void> initialize(@NonNull PulsarService pulsarService) { - Objects.requireNonNull(pulsarService); + if (pulsarService == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected pulsarService should not be null")); + } this.pulsarService = pulsarService; return CompletableFuture.completedFuture(null); } @Override public CompletableFuture<TopicCompactionService> newTopicCompactionService(@NonNull String topic) { - Objects.requireNonNull(topic); + if (topic == null) { + return FutureUtil.failedFuture(new NullPointerException("Expected topic should not be null")); + } PulsarTopicCompactionService pulsarTopicCompactionService = new PulsarTopicCompactionService(topic, pulsarService.getBookKeeperClient(), () -> { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index 1b54092d9aa..3c674975742 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -76,7 +76,7 @@ public class StrategicTwoPhaseCompactor extends PublishingOrderCompactor { } public CompletableFuture<Long> compact(String topic) { - throw new UnsupportedOperationException(); + return FutureUtil.failedFuture(new UnsupportedOperationException()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 0d3f8a4c619..84198967601 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -27,7 +27,6 @@ import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; import java.net.InetSocketAddress; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -40,6 +39,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.FrameDecoderUtil; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.PulsarSslConfiguration; import org.apache.pulsar.common.util.PulsarSslFactory; import org.apache.pulsar.common.util.SecurityUtility; @@ -106,10 +106,14 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> * @return a {@link CompletableFuture} that completes when the TLS is set up. */ CompletableFuture<Channel> initTls(Channel ch, InetSocketAddress sniHost) { - Objects.requireNonNull(ch, "A channel is required"); - Objects.requireNonNull(sniHost, "A sniHost is required"); + if (ch == null) { + return FutureUtil.failedFuture(new NullPointerException("A channel is required")); + } + if (sniHost == null) { + return FutureUtil.failedFuture(new NullPointerException("A sniHost is required")); + } if (!tlsEnabled) { - throw new IllegalStateException("TLS is not enabled in client configuration"); + return FutureUtil.failedFuture(new IllegalStateException("TLS is not enabled in client configuration")); } CompletableFuture<Channel> initTlsFuture = new CompletableFuture<>(); ch.eventLoop().execute(() -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index ae784a1b18d..1b43d17d9d1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -228,10 +227,13 @@ public class FutureUtil { } /** - * @throws NullPointerException NPE when param is null + * @return a {@link CompletableFuture} representing the newly scheduled task, + * or one completed exceptionally with {@link NullPointerException} if param is null. */ public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) { - Objects.requireNonNull(newTask); + if (newTask == null) { + return failedFuture(new NullPointerException("Expected Supplier should not be null")); + } if (sequencerFuture.isDone()) { if (sequencerFuture.isCompletedExceptionally() && allowExceptionBreakChain) { return sequencerFuture; @@ -282,13 +284,18 @@ public class FutureUtil { } /** - * @throws RejectedExecutionException if this task cannot be accepted for execution - * @throws NullPointerException if one of params is null + * @return a {@link CompletableFuture} representing the asynchronous composition. + * The returned future is completed exceptionally with {@link NullPointerException} if one of params is null, + * or with {@link RejectedExecutionException} if the task cannot be accepted for execution. */ public static <T> @NonNull CompletableFuture<T> composeAsync(Supplier<CompletableFuture<T>> futureSupplier, Executor executor) { - Objects.requireNonNull(futureSupplier); - Objects.requireNonNull(executor); + if (futureSupplier == null) { + return failedFuture(new NullPointerException("Expected Supplier should not be null")); + } + if (executor == null) { + return failedFuture(new NullPointerException("Expected Executor should not be null")); + } final CompletableFuture<T> future = new CompletableFuture<>(); try { executor.execute(() -> futureSupplier.get().whenComplete((result, error) -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java index 294c0db99ea..481d47a21c2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/ChannelFutures.java @@ -20,8 +20,8 @@ package org.apache.pulsar.common.util.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import java.util.Objects; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.util.FutureUtil; /** * Static utility methods for operating on {@link ChannelFuture}s. @@ -41,7 +41,9 @@ public class ChannelFutures { * and completes exceptionally if the channelFuture completes with a {@link Throwable} */ public static CompletableFuture<Channel> toCompletableFuture(ChannelFuture channelFuture) { - Objects.requireNonNull(channelFuture, "channelFuture cannot be null"); + if (channelFuture == null) { + return FutureUtil.failedFuture(new NullPointerException("channelFuture cannot be null")); + } CompletableFuture<Channel> adapter = new CompletableFuture<>(); if (channelFuture.isDone()) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java index 8686a381679..109fa2fa265 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/NettyFutureUtil.java @@ -19,8 +19,8 @@ package org.apache.pulsar.common.util.netty; import io.netty.util.concurrent.Future; -import java.util.Objects; import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.common.util.FutureUtil; /** * Contains utility methods for working with Netty Futures. @@ -34,7 +34,9 @@ public class NettyFutureUtil { * @return converted future instance */ public static <V> CompletableFuture<V> toCompletableFuture(Future<V> future) { - Objects.requireNonNull(future, "future cannot be null"); + if (future == null) { + return FutureUtil.failedFuture(new NullPointerException("future cannot be null")); + } CompletableFuture<V> adapter = new CompletableFuture<>(); if (future.isDone()) { @@ -62,7 +64,9 @@ public class NettyFutureUtil { * @return converted future instance */ public static CompletableFuture<Void> toCompletableFutureVoid(Future<?> future) { - Objects.requireNonNull(future, "future cannot be null"); + if (future == null) { + return FutureUtil.failedFuture(new NullPointerException("future cannot be null")); + } CompletableFuture<Void> adapter = new CompletableFuture<>(); if (future.isDone()) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java index 31c0eb4af2b..65b9e7b6339 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/ChannelFuturesTest.java @@ -23,6 +23,7 @@ import io.netty.channel.Channel; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultEventLoop; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.mockito.Mock; @@ -65,9 +66,16 @@ public class ChannelFuturesTest { channelFuture = new DefaultChannelPromise(channel); } - @Test(expectedExceptions = NullPointerException.class) + @Test public void toCompletableFuture_shouldRequireNonNullArgument() { - ChannelFutures.toCompletableFuture(null); + CompletableFuture<Channel> future = ChannelFutures.toCompletableFuture(null); + Assert.assertTrue(future.isCompletedExceptionally()); + try { + future.join(); + Assert.fail("Expected NullPointerException"); + } catch (CompletionException e) { + Assert.assertTrue(e.getCause() instanceof NullPointerException); + } } @Test
