This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d947b529655483bb65f148c036b5e719151f87b8 Author: Lari Hotari <[email protected]> AuthorDate: Tue Dec 2 17:33:44 2025 +0200 [improve][client] Deduplicate in-progress lookup requests also for HttpLookupService (#25017) (cherry picked from commit b37b5b3fa01494fd6541f4d460f3586443c221b6) --- .../pulsar/client/api/BrokerServiceLookupTest.java | 7 +- .../apache/pulsar/client/api/SimpleSchemaTest.java | 6 +- .../client/impl/BinaryProtoLookupService.java | 205 +++------------ .../pulsar/client/impl/HttpLookupService.java | 14 +- ...rogressDeduplicationDecoratorLookupService.java | 276 +++++++++++++++++++++ .../apache/pulsar/client/impl/LookupService.java | 26 +- .../pulsar/client/impl/PulsarClientImpl.java | 22 +- .../client/impl/BinaryProtoLookupServiceTest.java | 21 +- .../proxy/server/ProxyLookupThrottlingTest.java | 3 +- .../server/ProxyWithExtensibleLoadManagerTest.java | 13 +- .../pulsar/websocket/LookupProtocolTest.java | 29 +-- 11 files changed, 388 insertions(+), 234 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index ddbe6547213..e00e6d69ce8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -82,7 +82,6 @@ import org.apache.pulsar.broker.namespace.OwnershipCache; import org.apache.pulsar.broker.namespace.ServiceUnitUtils; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; -import org.apache.pulsar.client.impl.BinaryProtoLookupService; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -957,7 +956,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase implements ITe // Assert the lookup service is a "BinaryProtoLookupService". final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; final LookupService lookupService = pulsarClientImpl.getLookup(); - assertTrue(lookupService instanceof BinaryProtoLookupService); + assertTrue(lookupService.isBinaryProtoLookupService()); final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); final int topicPartitions = 10; @@ -981,7 +980,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase implements ITe // Assert the lookup service is a "BinaryProtoLookupService". final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; final LookupService lookupService = pulsarClientImpl.getLookup(); - assertTrue(lookupService instanceof BinaryProtoLookupService); + assertTrue(lookupService.isBinaryProtoLookupService()); final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); admin.topics().createNonPartitionedTopic(tpName); @@ -1234,7 +1233,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase implements ITe Consumer<String> consumer = pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName) .subscriptionName("s1").isAckReceiptEnabled(true).subscribe(); LookupService lookupService = pulsarClientImpl.getLookup(); - assertTrue(lookupService instanceof BinaryProtoLookupService); + assertTrue(lookupService.isBinaryProtoLookupService()); ClientCnx lookupConnection = pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join(); var metricReader = pulsarTestContext.getOpenTelemetryMetricReader(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index 3a7f65f0b07..d581e0a6b32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -49,10 +49,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException; import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException; import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.BinaryProtoLookupService; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; -import org.apache.pulsar.client.impl.HttpLookupService; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -1262,8 +1260,8 @@ public class SimpleSchemaTest extends ProducerConsumerBase { LookupService httpLookupService = httpProtocolClient.getLookup(); LookupService binaryLookupService = binaryProtocolClient.getLookup(); - Assert.assertTrue(httpLookupService instanceof HttpLookupService); - Assert.assertTrue(binaryLookupService instanceof BinaryProtoLookupService); + Assert.assertTrue(!httpLookupService.isBinaryProtoLookupService()); + Assert.assertTrue(binaryLookupService.isBinaryProtoLookupService()); Assert.assertEquals(admin.schemas().getAllSchemas(topic).size(), 2); Assert.assertTrue(httpLookupService.getSchema(TopicName.get(topic), ByteBuffer.allocate(8).putLong(0).array()).get().isPresent()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 368d9c3809a..674fb40793f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -26,17 +26,13 @@ import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang3.mutable.MutableObject; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; @@ -66,16 +62,6 @@ public class BinaryProtoLookupService implements LookupService { private final int maxLookupRedirects; private final ExecutorService lookupPinnedExecutor; private final boolean createdLookupPinnedExecutor; - - private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>> - lookupInProgress = new ConcurrentHashMap<>(); - - private final ConcurrentHashMap<PartitionedTopicMetadataKey, CompletableFuture<PartitionedTopicMetadata>> - partitionedMetadataInProgress = new ConcurrentHashMap<>(); - - private final ConcurrentHashMap<TopicsUnderNamespaceKey, CompletableFuture<GetTopicsResult>> - topicsUnderNamespaceInProgress = new ConcurrentHashMap<>(); - private final LatencyHistogram histoGetBroker; private final LatencyHistogram histoGetTopicMetadata; private final LatencyHistogram histoGetSchema; @@ -156,32 +142,20 @@ public class BinaryProtoLookupService implements LookupService { * topic-name * @return broker-socket-address that serves given topic */ - public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) { - long startTime = System.nanoTime(); - final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); - final Pair<TopicName, Map<String, String>> key = Pair.of(topicName, - client.getConfiguration().getLookupProperties()); - try { - return lookupInProgress.computeIfAbsent(key, tpName -> { - CompletableFuture<LookupTopicResult> newFuture = findBroker(serviceNameResolver.resolveHost(), false, - topicName, 0, key.getRight()); - newFutureCreated.setValue(newFuture); - - newFuture.thenRun(() -> { - histoGetBroker.recordSuccess(System.nanoTime() - startTime); - }).exceptionally(x -> { - histoGetBroker.recordFailure(System.nanoTime() - startTime); - return null; - }); - return newFuture; - }); - } finally { - if (newFutureCreated.getValue() != null) { - newFutureCreated.getValue().whenComplete((v, ex) -> { - lookupInProgress.remove(key, newFutureCreated.getValue()); - }); - } + public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, Map<String, String> lookupProperties) { + if (lookupProperties == null) { + lookupProperties = client.getConfiguration().getLookupProperties(); } + long startTime = System.nanoTime(); + CompletableFuture<LookupTopicResult> newFuture = findBroker(serviceNameResolver.resolveHost(), false, + topicName, 0, lookupProperties); + newFuture.thenRun(() -> { + histoGetBroker.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoGetBroker.recordFailure(System.nanoTime() - startTime); + return null; + }); + return newFuture; } /** @@ -191,24 +165,7 @@ public class BinaryProtoLookupService implements LookupService { @Override public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { - final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); - final PartitionedTopicMetadataKey key = new PartitionedTopicMetadataKey( - topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); - try { - return partitionedMetadataInProgress.computeIfAbsent(key, k -> { - CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadataAsync( - topicName, metadataAutoCreationEnabled, - useFallbackForNonPIP344Brokers); - newFutureCreated.setValue(newFuture); - return newFuture; - }); - } finally { - if (newFutureCreated.getValue() != null) { - newFutureCreated.getValue().whenComplete((v, ex) -> { - partitionedMetadataInProgress.remove(key, newFutureCreated.getValue()); - }); - } - } + return getPartitionedTopicMetadataAsync(topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); } private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socketAddress, @@ -351,12 +308,6 @@ public class BinaryProtoLookupService implements LookupService { return partitionFuture; } - @Override - public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) { - return getSchema(topicName, null); - } - - @Override public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) { long startTime = System.nanoTime(); @@ -403,31 +354,21 @@ public class BinaryProtoLookupService implements LookupService { Mode mode, String topicsPattern, String topicsHash) { - final MutableObject<CompletableFuture<GetTopicsResult>> newFutureCreated = new MutableObject<>(); - final TopicsUnderNamespaceKey key = new TopicsUnderNamespaceKey(namespace, mode, topicsPattern, topicsHash); - - try { - return topicsUnderNamespaceInProgress.computeIfAbsent(key, k -> { - CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>(); - AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) - .setMax(1, TimeUnit.MINUTES) - .create(); - - newFutureCreated.setValue(topicsFuture); - getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, - topicsPattern, topicsHash); - return topicsFuture; - }); - } finally { - if (newFutureCreated.getValue() != null) { - newFutureCreated.getValue().whenComplete((v, ex) -> { - topicsUnderNamespaceInProgress.remove(key, newFutureCreated.getValue()); - }); - } - } + CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>(); + AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMax(1, TimeUnit.MINUTES) + .create(); + getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, + topicsPattern, topicsHash); + return topicsFuture; + } + + @Override + public boolean isBinaryProtoLookupService() { + return true; } private void getTopicsUnderNamespace( @@ -517,93 +458,5 @@ public class BinaryProtoLookupService implements LookupService { } - private static final class TopicsUnderNamespaceKey { - private final NamespaceName namespace; - private final Mode mode; - private final String topicsPattern; - private final String topicsHash; - - TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, - String topicsPattern, String topicsHash) { - this.namespace = namespace; - this.mode = mode; - this.topicsPattern = topicsPattern; - this.topicsHash = topicsHash; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o; - return Objects.equals(namespace, that.namespace) - && mode == that.mode - && Objects.equals(topicsPattern, that.topicsPattern) - && Objects.equals(topicsHash, that.topicsHash); - } - - @Override - public int hashCode() { - return Objects.hash(namespace, mode, topicsPattern, topicsHash); - } - - @Override - public String toString() { - return "TopicsUnderNamespaceKey{" - + "namespace=" + namespace - + ", mode=" + mode - + ", topicsPattern='" + topicsPattern + '\'' - + ", topicsHash='" + topicsHash + '\'' - + '}'; - } - } - - private static final class PartitionedTopicMetadataKey { - private final TopicName topicName; - private final boolean metadataAutoCreationEnabled; - private final boolean useFallbackForNonPIP344Brokers; - - PartitionedTopicMetadataKey(TopicName topicName, - boolean metadataAutoCreationEnabled, - boolean useFallbackForNonPIP344Brokers) { - this.topicName = topicName; - this.metadataAutoCreationEnabled = metadataAutoCreationEnabled; - this.useFallbackForNonPIP344Brokers = useFallbackForNonPIP344Brokers; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o; - return metadataAutoCreationEnabled == that.metadataAutoCreationEnabled - && useFallbackForNonPIP344Brokers == that.useFallbackForNonPIP344Brokers - && Objects.equals(topicName, that.topicName); - } - - @Override - public int hashCode() { - return Objects.hash(topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); - } - - @Override - public String toString() { - return "PartitionedTopicMetadataKey{" - + "topicName=" + topicName - + ", metadataAutoCreationEnabled=" + metadataAutoCreationEnabled - + ", useFallbackForNonPIP344Brokers=" + useFallbackForNonPIP344Brokers - + '}'; - } - } - - private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 08c9956b5be..9044088f724 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -28,6 +28,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; @@ -105,7 +106,14 @@ public class HttpLookupService implements LookupService { */ @Override @SuppressWarnings("deprecation") - public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) { + public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, Map<String, String> lookupProperties) { + if (lookupProperties == null) { + lookupProperties = httpClient.clientConf.getLookupProperties(); + } + if (lookupProperties != null && !lookupProperties.isEmpty()) { + log.warn("Lookup properties aren't supported for http lookup service. lookupProperties: {}", + lookupProperties); + } String basePath = topicName.isV2() ? BasePathV2 : BasePathV1; String path = basePath + topicName.getLookupName(); path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName); @@ -211,8 +219,8 @@ public class HttpLookupService implements LookupService { } @Override - public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) { - return getSchema(topicName, null); + public boolean isBinaryProtoLookupService() { + return false; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java new file mode 100644 index 00000000000..8cf95b62634 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/InProgressDeduplicationDecoratorLookupService.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; +import org.apache.pulsar.common.lookup.GetTopicsResult; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * Decorator for {@link LookupService} that deduplicates in-progress lookups for topics, schemas, partitioned topics + * and topic listings for namespace. + */ +public class InProgressDeduplicationDecoratorLookupService implements LookupService { + private final LookupService delegate; + private final Supplier<Map<String, String>> lookupPropertiesSupplier; + private final InProgressHolder<LookupBrokerKey, CompletableFuture<LookupTopicResult>> topicLookupsInProgress = + new InProgressHolder<>(); + private final InProgressHolder<PartitionedTopicMetadataKey, CompletableFuture<PartitionedTopicMetadata>> + partitionedTopicMetadataInProgress = new InProgressHolder<>(); + private final InProgressHolder<LookupSchemaKey, CompletableFuture<Optional<SchemaInfo>>> schemasInProgress = + new InProgressHolder<>(); + private final InProgressHolder<TopicsUnderNamespaceKey, CompletableFuture<GetTopicsResult>> + topicsUnderNamespaceInProgress = new InProgressHolder<>(); + + public InProgressDeduplicationDecoratorLookupService(LookupService delegate, + Supplier<Map<String, String>> lookupPropertiesSupplier) { + this.delegate = delegate; + this.lookupPropertiesSupplier = lookupPropertiesSupplier; + } + + @Override + public void updateServiceUrl(String serviceUrl) throws PulsarClientException { + delegate.updateServiceUrl(serviceUrl); + } + + @Override + public String getServiceUrl() { + return delegate.getServiceUrl(); + } + + @Override + public InetSocketAddress resolveHost() { + return delegate.resolveHost(); + } + + @Override + public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, Map<String, String> lookupProperties) { + Map<String, String> lookupPropertiesToUse = + lookupProperties != null ? lookupProperties : lookupPropertiesSupplier.get(); + return topicLookupsInProgress.getOrComputeIfAbsent( + new LookupBrokerKey(topicName.toString(), lookupPropertiesToUse), + () -> delegate.getBroker(topicName, lookupPropertiesToUse)); + } + + @Override + public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( + TopicName topicName, + boolean metadataAutoCreationEnabled, + boolean useFallbackForNonPIP344Brokers) { + return partitionedTopicMetadataInProgress.getOrComputeIfAbsent( + new PartitionedTopicMetadataKey(topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers), + () -> delegate.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled, + useFallbackForNonPIP344Brokers)); + } + + @Override + public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) { + // all partitions of a partitioned topic share the same schema + // therefore, perform the lookup with the partitioned topic name + String topicForSchemaLookup = topicName.getPartitionedTopicName(); + return schemasInProgress.getOrComputeIfAbsent(new LookupSchemaKey(topicForSchemaLookup, version), + () -> delegate.getSchema(TopicName.get(topicForSchemaLookup), version)); + } + + @Override + public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode, + String topicPattern, String topicsHash) { + return topicsUnderNamespaceInProgress.getOrComputeIfAbsent( + new TopicsUnderNamespaceKey(namespace, mode, topicPattern, topicsHash), + () -> delegate.getTopicsUnderNamespace(namespace, mode, topicPattern, topicsHash)); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + @Override + public boolean isBinaryProtoLookupService() { + return delegate.isBinaryProtoLookupService(); + } + + private static class InProgressHolder<K, V extends CompletableFuture<?>> { + private final ConcurrentHashMap<K, V> inProgress = new ConcurrentHashMap<>(); + + public V getOrComputeIfAbsent(K key, Supplier<V> supplier) { + final MutableObject<V> newFutureCreated = new MutableObject<>(); + try { + return inProgress.computeIfAbsent(key, k -> { + V newFuture = supplier.get(); + newFutureCreated.setValue(newFuture); + return newFuture; + }); + } finally { + V newFutureCreatedValue = newFutureCreated.getValue(); + if (newFutureCreatedValue != null) { + newFutureCreatedValue.whenComplete((v, ex) -> { + inProgress.remove(key, newFutureCreatedValue); + }); + } + } + } + } + + private static final class LookupBrokerKey { + private final String topic; + private final Map<String, String> properties; + + private LookupBrokerKey(String topic, Map<String, String> properties) { + this.topic = topic; + this.properties = properties.isEmpty() ? Collections.emptyMap() : new HashMap<>(properties); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + LookupBrokerKey lookupBrokerKey = (LookupBrokerKey) o; + return Objects.equals(topic, lookupBrokerKey.topic) && properties.equals(lookupBrokerKey.properties); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(topic); + result = 31 * result + properties.hashCode(); + return result; + } + } + + private static final class LookupSchemaKey { + private final String topic; + private final byte[] version; + + private LookupSchemaKey(String topic, byte[] version) { + this.topic = topic; + this.version = version != null ? version.clone() : null; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + LookupSchemaKey that = (LookupSchemaKey) o; + return Objects.equals(topic, that.topic) && Arrays.equals(version, that.version); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(topic); + result = 31 * result + Arrays.hashCode(version); + return result; + } + } + + private static final class TopicsUnderNamespaceKey { + private final NamespaceName namespace; + private final Mode mode; + private final String topicsPattern; + private final String topicsHash; + + TopicsUnderNamespaceKey(NamespaceName namespace, Mode mode, String topicsPattern, String topicsHash) { + this.namespace = namespace; + this.mode = mode; + this.topicsPattern = topicsPattern; + this.topicsHash = topicsHash; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopicsUnderNamespaceKey that = (TopicsUnderNamespaceKey) o; + return Objects.equals(namespace, that.namespace) && mode == that.mode && Objects.equals(topicsPattern, + that.topicsPattern) && Objects.equals(topicsHash, that.topicsHash); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, mode, topicsPattern, topicsHash); + } + + @Override + public String toString() { + return "TopicsUnderNamespaceKey{" + "namespace=" + namespace + ", mode=" + mode + ", topicsPattern='" + + topicsPattern + '\'' + ", topicsHash='" + topicsHash + '\'' + '}'; + } + } + + private static final class PartitionedTopicMetadataKey { + private final TopicName topicName; + private final boolean metadataAutoCreationEnabled; + private final boolean useFallbackForNonPIP344Brokers; + + PartitionedTopicMetadataKey(TopicName topicName, boolean metadataAutoCreationEnabled, + boolean useFallbackForNonPIP344Brokers) { + this.topicName = topicName; + this.metadataAutoCreationEnabled = metadataAutoCreationEnabled; + this.useFallbackForNonPIP344Brokers = useFallbackForNonPIP344Brokers; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionedTopicMetadataKey that = (PartitionedTopicMetadataKey) o; + return metadataAutoCreationEnabled == that.metadataAutoCreationEnabled + && useFallbackForNonPIP344Brokers == that.useFallbackForNonPIP344Brokers && Objects.equals( + topicName, that.topicName); + } + + @Override + public int hashCode() { + return Objects.hash(topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); + } + + @Override + public String toString() { + return "PartitionedTopicMetadataKey{" + "topicName=" + topicName + ", metadataAutoCreationEnabled=" + + metadataAutoCreationEnabled + ", useFallbackForNonPIP344Brokers=" + useFallbackForNonPIP344Brokers + + '}'; + } + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 3367ae99cb1..67cfe449ec9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.PulsarClientException; @@ -56,7 +57,21 @@ public interface LookupService extends AutoCloseable { * @return a {@link LookupTopicResult} representing the logical and physical address of the broker that serves the * given topic, as well as proxying information. */ - CompletableFuture<LookupTopicResult> getBroker(TopicName topicName); + default CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) { + return getBroker(topicName, null); + } + + /** + * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given + * topic. This lookup is made with the given lookup properties. When null is passed, the + * default lookup properties specified in the client configuration are used. + * + * @param topicName + * topic-name + * @return a {@link LookupTopicResult} representing the logical and physical address of the broker that serves the + * given topic, as well as proxying information. + */ + CompletableFuture<LookupTopicResult> getBroker(TopicName topicName, Map<String, String> lookupProperties); /** * Returns {@link PartitionedTopicMetadata} for a given topic. @@ -104,7 +119,9 @@ public interface LookupService extends AutoCloseable { * @param topicName topic-name * @return SchemaInfo */ - CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName); + default CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) { + return getSchema(topicName, null); + } /** * Returns specific version SchemaInfo {@link SchemaInfo} for a given topic. @@ -144,4 +161,9 @@ public interface LookupService extends AutoCloseable { */ CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode, String topicPattern, String topicsHash); + + /** + * Returns true if the lookup service is a binary protocol lookup service. + */ + boolean isBinaryProtoLookupService(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index d8b27e7264b..27c0010c7df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -271,14 +271,7 @@ public class PulsarClientImpl implements PulsarClient { } else { this.timer = timer; } - if (conf.getServiceUrl().startsWith("http")) { - lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup, this.timer, - getNameResolver()); - } else { - lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), - this.lookupExecutorProvider.getExecutor()); - } + lookup = createLookup(conf.getServiceUrl()); if (conf.getServiceUrlProvider() != null) { conf.getServiceUrlProvider().initialize(this); @@ -1154,7 +1147,7 @@ public class PulsarClientImpl implements PulsarClient { } public CompletableFuture<ClientCnx> getConnectionToServiceUrl() { - if (!(lookup instanceof BinaryProtoLookupService)) { + if (!lookup.isBinaryProtoLookupService()) { return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL( "Can't get client connection to HTTP service URL", null)); } @@ -1164,7 +1157,7 @@ public class PulsarClientImpl implements PulsarClient { public CompletableFuture<ClientCnx> getProxyConnection(final InetSocketAddress logicalAddress, final int randomKeyForSelectConnection) { - if (!(lookup instanceof BinaryProtoLookupService)) { + if (!lookup.isBinaryProtoLookupService()) { return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL( "Cannot proxy connection through HTTP service URL", null)); } @@ -1233,12 +1226,15 @@ public class PulsarClientImpl implements PulsarClient { } public LookupService createLookup(String url) throws PulsarClientException { + LookupService lookupService; if (url.startsWith("http")) { - return new HttpLookupService(instrumentProvider, conf, eventLoopGroup, timer, getNameResolver()); + lookupService = new HttpLookupService(instrumentProvider, conf, eventLoopGroup, timer, getNameResolver()); } else { - return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(), - externalExecutorProvider.getExecutor()); + lookupService = new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(), + this.scheduledExecutorProvider.getExecutor(), this.lookupExecutorProvider.getExecutor()); } + return new InProgressDeduplicationDecoratorLookupService(lookupService, + () -> getConfiguration().getLookupProperties()); } /** diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 8bc78264ec7..7b9d7b37e5e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -37,6 +37,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -204,6 +205,10 @@ public class BinaryProtoLookupServiceTest { return lookupResult; } + private LookupService decoratedLookupService(LookupService lookupService) { + return new InProgressDeduplicationDecoratorLookupService(lookupService, () -> Collections.emptyMap()); + } + /** * Verifies that getTopicsUnderNamespace() deduplicates concurrent requests and cleans up after completion. * @@ -236,8 +241,8 @@ public class BinaryProtoLookupServiceTest { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); - try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( - client, "pulsar://broker:6650", null, false, scheduler, /*lookupPinnedExecutor*/ null)) { + try (LookupService lookup = decoratedLookupService(new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, /*lookupPinnedExecutor*/ null))) { NamespaceName ns = NamespaceName.get("public", "default"); Mode mode = Mode.PERSISTENT; @@ -295,8 +300,8 @@ public class BinaryProtoLookupServiceTest { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); - try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( - client, "pulsar://broker:6650", null, false, scheduler, null)) { + try (LookupService lookup = decoratedLookupService(new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null))) { NamespaceName ns = NamespaceName.get("public", "default"); Mode mode = Mode.PERSISTENT; @@ -366,8 +371,8 @@ public class BinaryProtoLookupServiceTest { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); - try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( - client, "pulsar://broker:6650", null, false, scheduler, null)) { + try (LookupService lookup = decoratedLookupService(new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null))) { TopicName topic = TopicName.get("persistent://public/default/t1"); boolean metadataAutoCreationEnabled = true; @@ -426,8 +431,8 @@ public class BinaryProtoLookupServiceTest { ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lookup-test-sched")); - try (BinaryProtoLookupService lookup = new BinaryProtoLookupService( - client, "pulsar://broker:6650", null, false, scheduler, null)) { + try (LookupService lookup = decoratedLookupService(new BinaryProtoLookupService( + client, "pulsar://broker:6650", null, false, scheduler, null))) { TopicName topic = TopicName.get("persistent://public/default/t1"); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java index 00abe6c55c4..83321d0f776 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.BinaryProtoLookupService; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -135,7 +134,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest { .serviceUrl(proxyService.getServiceUrl()).build(); String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); LookupService lookupService = client.getLookup(); - assertTrue(lookupService instanceof BinaryProtoLookupService); + assertTrue(lookupService.isBinaryProtoLookupService()); ClientCnx lookupConnection = client.getCnxPool().getConnection(lookupService.resolveHost()).join(); // Make no permits to lookup. diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java index f0f8a3bbcae..2529c929a6f 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -190,7 +191,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { var producerClient = producerClientFuture.get(); @Cleanup var producer = producerClient.newProducer(Schema.INT32).topic(topicName.toString()).create(); - LookupService producerLookupServiceSpy = spyField(producerClient, "lookup"); + LookupService producerLookupServiceSpy = spyField(producerClient.getLookup(), "delegate"); @Cleanup var consumerClient = consumerClientFuture.get(); @@ -200,7 +201,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { subscriptionName(BrokerTestUtil.newUniqueName("my-sub")). ackTimeout(1000, TimeUnit.MILLISECONDS). subscribe(); - LookupService consumerLookupServiceSpy = spyField(consumerClient, "lookup"); + LookupService consumerLookupServiceSpy = spyField(consumerClient.getLookup(), "delegate"); var bundleRange = admin.lookups().getBundleRange(topicName.toString()); @@ -268,7 +269,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { @Cleanup var producer = (ProducerImpl<Integer>) producerClient.newProducer(Schema.INT32).topic(topicName.toString()). create(); - LookupService producerLookupServiceSpy = spyField(producerClient, "lookup"); + LookupService producerLookupServiceSpy = spyField(producerClient.getLookup(), "delegate"); when(((ServiceNameResolver) spyField(producerLookupServiceSpy, "serviceNameResolver")).resolveHost()). thenCallRealMethod().then(invocation -> getSourceBrokerInetAddress(topicName)); @@ -280,7 +281,7 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { subscriptionName(BrokerTestUtil.newUniqueName("my-sub")). ackTimeout(1000, TimeUnit.MILLISECONDS). subscribe(); - LookupService consumerLookupServiceSpy = spyField(consumerClient, "lookup"); + LookupService consumerLookupServiceSpy = spyField(consumerClient.getLookup(), "delegate"); when(((ServiceNameResolver) spyField(consumerLookupServiceSpy, "serviceNameResolver")).resolveHost()). thenCallRealMethod().then(invocation -> getSourceBrokerInetAddress(topicName)); @@ -341,9 +342,9 @@ public class ProxyWithExtensibleLoadManagerTest extends MultiBrokerBaseTest { assertEquals(FieldUtils.readDeclaredField(consumer.getConnectionHandler(), "useProxy", true), Boolean.FALSE); verify(producerClient, times(1)).getProxyConnection(any(), anyInt()); - verify(producerLookupServiceSpy, times(1)).getBroker(topicName); + verify(producerLookupServiceSpy, times(1)).getBroker(eq(topicName), any()); verify(consumerClient, times(1)).getProxyConnection(any(), anyInt()); - verify(consumerLookupServiceSpy, times(1)).getBroker(topicName); + verify(consumerLookupServiceSpy, times(1)).getBroker(eq(topicName), any()); } } diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java index 67dff01e1cc..da7c2892936 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/LookupProtocolTest.java @@ -19,7 +19,9 @@ package org.apache.pulsar.websocket; import io.netty.channel.epoll.Epoll; +import java.io.IOException; import java.lang.reflect.Field; +import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; import org.testng.Assert; @@ -38,12 +40,17 @@ public class LookupProtocolTest { conf.setServiceUrl("http://localhost:8080"); conf.setServiceUrlTls("https://localhost:8443"); WebSocketService service = new WebSocketService(conf); + assertLookupIsBinaryProtoLookup(service, false); + service.close(); + } + + private static void assertLookupIsBinaryProtoLookup(WebSocketService service, boolean expected) + throws IOException, NoSuchFieldException, IllegalAccessException { PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); lookupField.setAccessible(true); - Assert.assertEquals(lookupField.get(testClient).getClass().getName(), - "org.apache.pulsar.client.impl.HttpLookupService"); - service.close(); + LookupService lookupService = (LookupService) lookupField.get(testClient); + Assert.assertEquals(expected, lookupService.isBinaryProtoLookupService()); } @Test(timeOut = 10000) @@ -55,10 +62,7 @@ public class LookupProtocolTest { conf.setBrokerClientTlsEnabled(true); WebSocketService service = new WebSocketService(conf); PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); - Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); - lookupField.setAccessible(true); - Assert.assertEquals(lookupField.get(testClient).getClass().getName(), - "org.apache.pulsar.client.impl.HttpLookupService"); + assertLookupIsBinaryProtoLookup(service, false); Assert.assertTrue(testClient.getConfiguration().isUseTls()); service.close(); } @@ -71,11 +75,7 @@ public class LookupProtocolTest { conf.setBrokerServiceUrl("pulsar://localhost:6650"); conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651"); WebSocketService service = new WebSocketService(conf); - PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); - Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); - lookupField.setAccessible(true); - Assert.assertEquals(lookupField.get(testClient).getClass().getName(), - "org.apache.pulsar.client.impl.BinaryProtoLookupService"); + assertLookupIsBinaryProtoLookup(service, true); service.close(); } @@ -89,10 +89,7 @@ public class LookupProtocolTest { conf.setBrokerClientTlsEnabled(true); WebSocketService service = new WebSocketService(conf); PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient(); - Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup"); - lookupField.setAccessible(true); - Assert.assertEquals(lookupField.get(testClient).getClass().getName(), - "org.apache.pulsar.client.impl.BinaryProtoLookupService"); + assertLookupIsBinaryProtoLookup(service, true); Assert.assertTrue(testClient.getConfiguration().isUseTls()); service.close(); }
