This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d384036afa [cleanup] Remove fastutil dependency (#25413)
3d384036afa is described below
commit 3d384036afa8ab44ecc2c6105c9f480ea42a54b1
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Mar 27 15:14:11 2026 -0700
[cleanup] Remove fastutil dependency (#25413)
---
.../pulsar.client-shade-conventions.gradle.kts | 2 -
distribution/server/src/assemble/LICENSE.bin.txt | 1 -
distribution/shell/src/assemble/LICENSE.bin.txt | 1 -
gradle/libs.versions.toml | 2 -
pulsar-broker/build.gradle.kts | 1 -
.../delayed/InMemoryDelayedDeliveryTracker.java | 30 +--
.../org/apache/pulsar/broker/service/Consumer.java | 4 +-
.../broker/service/DrainingHashesTracker.java | 2 +-
.../broker/service/InMemoryRedeliveryTracker.java | 8 +-
.../pulsar/broker/service/PendingAcksMap.java | 62 ++---
...istentStickyKeyDispatcherMultipleConsumers.java | 5 +-
.../delayed/InMemoryDeliveryTrackerTest.java | 2 +-
pulsar-client-admin-shaded/build.gradle.kts | 5 +-
pulsar-client-all/build.gradle.kts | 5 +-
.../build.gradle.kts | 180 -------------
pulsar-client-shaded/build.gradle.kts | 5 +-
pulsar-client/build.gradle.kts | 1 -
.../pulsar/client/impl/NegativeAcksTracker.java | 27 +-
.../util/collections/Int2ObjectOpenHashMap.java | 193 ++++++++++++++
.../pulsar/common/util/collections/IntIntPair.java | 16 +-
.../common/util/collections/IntOpenHashSet.java | 123 +++++++++
.../common/util/collections/Long2IntMap.java | 83 ++++++
.../util/collections/Long2IntOpenHashMap.java | 190 ++++++++++++++
.../common/util/collections/Long2ObjectMap.java | 98 ++++++++
.../util/collections/Long2ObjectOpenHashMap.java | 277 +++++++++++++++++++++
.../common/util/collections/LongObjConsumer.java | 17 +-
.../common/util/collections/LongOpenHashSet.java | 159 ++++++++++++
.../common/util/collections/ObjectIntPair.java | 16 +-
.../common/util/collections/HashSetTest.java | 105 ++++++++
.../collections/Int2ObjectOpenHashMapTest.java | 88 +++++++
.../util/collections/Long2IntOpenHashMapTest.java | 100 ++++++++
.../collections/Long2ObjectOpenHashMapTest.java | 179 +++++++++++++
settings.gradle.kts | 3 -
33 files changed, 1686 insertions(+), 304 deletions(-)
diff --git
a/buildSrc/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
b/buildSrc/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
index 04da28be1a7..f67f5a70147 100644
--- a/buildSrc/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
+++ b/buildSrc/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
@@ -37,7 +37,6 @@
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
include(project(":pulsar-client-admin-original"))
include(project(":pulsar-common"))
include(project(":pulsar-client-messagecrypto-bc"))
- include(project(":pulsar-client-dependencies-minimized"))
include(dependency("com.fasterxml.jackson.*:.*"))
include(dependency("com.google.*:.*"))
include(dependency("com.google.auth:.*"))
@@ -147,7 +146,6 @@
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
relocateWithPrefix(shadePrefix, "io.opencensus")
relocateWithPrefix(shadePrefix, "io.prometheus.client")
relocateWithPrefix(shadePrefix, "io.swagger")
- relocateWithPrefix(shadePrefix, "it.unimi.dsi.fastutil")
relocateWithPrefix(shadePrefix, "javassist")
relocateWithPrefix(shadePrefix, "javax.activation")
relocateWithPrefix(shadePrefix, "javax.annotation")
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 6142500537e..5177464d39e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -262,7 +262,6 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.18.6.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.3.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
- * Fastutil -- it.unimi.dsi-fastutil-8.5.16.jar
* Proto Google Common Protos --
com.google.api.grpc-proto-google-common-protos-2.59.2.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.6.jar
* Gson
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index ebeee2013ce..70cc3a61fa3 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -422,7 +422,6 @@ The Apache Software License, Version 2.0
* RE2j -- re2j-1.8.jar
* Spotify completable-futures -- completable-futures-0.3.6.jar
* RoaringBitmap -- RoaringBitmap-1.6.9.jar
- * Fastutil -- fastutil-8.5.16.jar
* JSpecify -- jspecify-1.0.0.jar
BSD 3-clause "New" or "Revised" License
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index fe80bbc4d5c..1bc19f6581b 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -98,7 +98,6 @@ opentelemetry-gcp-resources = "1.48.0-alpha"
# Data structures / Utils
guava = "33.4.8-jre"
caffeine = "3.2.3"
-fastutil = "8.5.16"
jctools = "4.0.5"
roaringbitmap = "1.6.9"
hppc = "0.9.1"
@@ -370,7 +369,6 @@ error-prone-annotations = { module =
"com.google.errorprone:error_prone_annotati
# Data structures
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref =
"caffeine" }
-fastutil = { module = "it.unimi.dsi:fastutil", version.ref = "fastutil" }
jctools-core = { module = "org.jctools:jctools-core", version.ref = "jctools" }
roaringbitmap = { module = "org.roaringbitmap:RoaringBitmap", version.ref =
"roaringbitmap" }
hppc = { module = "com.carrotsearch:hppc", version.ref = "hppc" }
diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index ca62d9e9112..9baa9043322 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -46,7 +46,6 @@ dependencies {
implementation(libs.slf4j.api)
implementation(libs.netty.transport)
implementation(libs.protobuf.java)
- implementation(libs.fastutil)
implementation(libs.curator.recipes)
implementation(libs.bookkeeper.stream.storage.server) {
exclude(group = "org.apache.bookkeeper")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index ad5ab25fbbf..9c58f0a228a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -20,14 +20,9 @@ package org.apache.pulsar.broker.delayed;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
-import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.longs.LongSet;
import java.time.Clock;
import java.util.NavigableSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,15 +31,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.common.util.collections.LongOpenHashSet;
import org.roaringbitmap.longlong.Roaring64Bitmap;
@Slf4j
public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker {
// timestamp -> ledgerId -> entryId
- // AVL tree -> OpenHashMap -> RoaringBitmap
- protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
- delayedMessageMap = new Long2ObjectAVLTreeMap<>();
+ // TreeMap -> TreeMap -> RoaringBitmap
+ protected final TreeMap<Long, TreeMap<Long, Roaring64Bitmap>>
+ delayedMessageMap = new TreeMap<>();
// If we detect that all messages have fixed delay time, such that the
delivery is
// always going to be in FIFO order, then we can avoid pulling all the
messages in
@@ -126,7 +122,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
- delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectRBTreeMap<>())
+ delayedMessageMap.computeIfAbsent(timestamp, k -> new TreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
delayedMessagesCount.incrementAndGet();
@@ -156,7 +152,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
@Override
public boolean hasMessageAvailable() {
boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
- && delayedMessageMap.firstLongKey() <= getCutoffTime();
+ && delayedMessageMap.firstKey() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
@@ -173,15 +169,15 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
long cutoffTime = getCutoffTime();
while (n > 0 && !delayedMessageMap.isEmpty()) {
- long timestamp = delayedMessageMap.firstLongKey();
+ long timestamp = delayedMessageMap.firstKey();
if (timestamp > cutoffTime) {
break;
}
- LongSet ledgerIdToDelete = new LongOpenHashSet();
- Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
- for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
- long ledgerId = ledgerEntry.getLongKey();
+ LongOpenHashSet ledgerIdToDelete = new LongOpenHashSet();
+ TreeMap<Long, Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
+ for (var ledgerEntry : ledgerMap.entrySet()) {
+ long ledgerId = ledgerEntry.getKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
int cardinality = (int) entryIds.getLongCardinality();
if (cardinality <= n) {
@@ -270,6 +266,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
protected long nextDeliveryTime() {
- return delayedMessageMap.firstLongKey();
+ return delayedMessageMap.firstKey();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3edf6e2d681..27a7dddf29f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
-import it.unimi.dsi.fastutil.ints.IntIntPair;
-import it.unimi.dsi.fastutil.objects.ObjectIntPair;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
@@ -73,6 +71,8 @@ import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.IntIntPair;
+import org.apache.pulsar.common.util.collections.ObjectIntPair;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
index 51c45817368..0f12f72f7f7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -33,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.DrainingHash;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
+import org.apache.pulsar.common.util.collections.Int2ObjectOpenHashMap;
import org.roaringbitmap.RoaringBitmap;
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
index 66956205521..0d684eb1736 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.service;
-import it.unimi.dsi.fastutil.longs.Long2IntMap;
-import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.util.collections.Long2IntMap;
+import org.apache.pulsar.common.util.collections.Long2IntOpenHashMap;
+import org.apache.pulsar.common.util.collections.Long2ObjectMap;
+import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap;
public class InMemoryRedeliveryTracker implements RedeliveryTracker {
// ledgerId -> entryId -> count
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 7a728a037dc..87c44283bd3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -18,16 +18,14 @@
*/
package org.apache.pulsar.broker.service;
-import it.unimi.dsi.fastutil.ints.IntIntPair;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
-import it.unimi.dsi.fastutil.objects.ObjectBidirectionalIterator;
+import java.util.Iterator;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import org.apache.pulsar.common.util.collections.IntIntPair;
/**
* A thread-safe map to store pending acks in the consumer.
@@ -98,7 +96,7 @@ public class PendingAcksMap {
}
private final Consumer consumer;
- private final Long2ObjectSortedMap<Long2ObjectSortedMap<IntIntPair>>
pendingAcks;
+ private final TreeMap<Long, TreeMap<Long, IntIntPair>> pendingAcks;
private final Supplier<PendingAcksAddHandler>
pendingAcksAddHandlerSupplier;
private final Supplier<PendingAcksRemoveHandler>
pendingAcksRemoveHandlerSupplier;
private final Lock readLock;
@@ -108,7 +106,7 @@ public class PendingAcksMap {
PendingAcksMap(Consumer consumer, Supplier<PendingAcksAddHandler>
pendingAcksAddHandlerSupplier,
Supplier<PendingAcksRemoveHandler>
pendingAcksRemoveHandlerSupplier) {
this.consumer = consumer;
- this.pendingAcks = new Long2ObjectRBTreeMap<>();
+ this.pendingAcks = new TreeMap<>();
this.pendingAcksAddHandlerSupplier = pendingAcksAddHandlerSupplier;
this.pendingAcksRemoveHandlerSupplier =
pendingAcksRemoveHandlerSupplier;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -143,8 +141,8 @@ public class PendingAcksMap {
&& !pendingAcksAddHandler.handleAdding(consumer, ledgerId,
entryId, stickyKeyHash)) {
return false;
}
- Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks =
- pendingAcks.computeIfAbsent(ledgerId, k -> new
Long2ObjectRBTreeMap<>());
+ TreeMap<Long, IntIntPair> ledgerPendingAcks =
+ pendingAcks.computeIfAbsent(ledgerId, k -> new
TreeMap<>());
ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize,
stickyKeyHash));
return true;
} finally {
@@ -160,7 +158,7 @@ public class PendingAcksMap {
public long size() {
try {
readLock.lock();
- return
pendingAcks.values().stream().mapToInt(Long2ObjectSortedMap::size).sum();
+ return pendingAcks.values().stream().mapToInt(TreeMap::size).sum();
} finally {
readLock.unlock();
}
@@ -184,12 +182,12 @@ public class PendingAcksMap {
private void processPendingAcks(PendingAcksConsumer processor) {
// this code uses for loops intentionally, don't refactor to use
forEach
// iterate the outer map
- for (Map.Entry<Long, Long2ObjectSortedMap<IntIntPair>> entry :
pendingAcks.entrySet()) {
- Long ledgerId = entry.getKey();
- Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks =
entry.getValue();
+ for (Map.Entry<Long, TreeMap<Long, IntIntPair>> entry :
pendingAcks.entrySet()) {
+ long ledgerId = entry.getKey();
+ TreeMap<Long, IntIntPair> ledgerPendingAcks = entry.getValue();
// iterate the inner map
for (Map.Entry<Long, IntIntPair> e : ledgerPendingAcks.entrySet())
{
- Long entryId = e.getKey();
+ long entryId = e.getKey();
IntIntPair batchSizeAndStickyKeyHash = e.getValue();
processor.accept(ledgerId, entryId,
batchSizeAndStickyKeyHash.leftInt(),
batchSizeAndStickyKeyHash.rightInt());
@@ -237,7 +235,7 @@ public class PendingAcksMap {
public boolean contains(long ledgerId, long entryId) {
try {
readLock.lock();
- Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return false;
}
@@ -257,7 +255,7 @@ public class PendingAcksMap {
public IntIntPair get(long ledgerId, long entryId) {
try {
readLock.lock();
- Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return null;
}
@@ -279,7 +277,7 @@ public class PendingAcksMap {
public boolean remove(long ledgerId, long entryId, int batchSize, int
stickyKeyHash) {
try {
writeLock.lock();
- Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return false;
}
@@ -306,7 +304,7 @@ public class PendingAcksMap {
public boolean remove(long ledgerId, long entryId) {
try {
writeLock.lock();
- Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ TreeMap<Long, IntIntPair> ledgerMap = pendingAcks.get(ledgerId);
if (ledgerMap == null) {
return false;
}
@@ -361,23 +359,23 @@ public class PendingAcksMap {
} else {
readLock.lock();
}
-
ObjectBidirectionalIterator<Long2ObjectMap.Entry<Long2ObjectSortedMap<IntIntPair>>>
ledgerMapIterator =
- pendingAcks.headMap(markDeleteLedgerId +
1).long2ObjectEntrySet().iterator();
+ Iterator<Map.Entry<Long, TreeMap<Long, IntIntPair>>>
ledgerMapIterator =
+ pendingAcks.headMap(markDeleteLedgerId +
1).entrySet().iterator();
while (ledgerMapIterator.hasNext()) {
- Long2ObjectMap.Entry<Long2ObjectSortedMap<IntIntPair>> entry =
ledgerMapIterator.next();
- long ledgerId = entry.getLongKey();
- Long2ObjectSortedMap<IntIntPair> ledgerMap = entry.getValue();
- Long2ObjectSortedMap<IntIntPair> ledgerMapHead;
+ Map.Entry<Long, TreeMap<Long, IntIntPair>> entry =
ledgerMapIterator.next();
+ long ledgerId = entry.getKey();
+ TreeMap<Long, IntIntPair> ledgerMap = entry.getValue();
+ TreeMap<Long, IntIntPair> ledgerMapHead;
if (ledgerId == markDeleteLedgerId) {
- ledgerMapHead = ledgerMap.headMap(markDeleteEntryId + 1);
+ ledgerMapHead = new
TreeMap<>(ledgerMap.headMap(markDeleteEntryId + 1));
} else {
ledgerMapHead = ledgerMap;
}
- ObjectBidirectionalIterator<Long2ObjectMap.Entry<IntIntPair>>
entryMapIterator =
- ledgerMapHead.long2ObjectEntrySet().iterator();
+ Iterator<Map.Entry<Long, IntIntPair>> entryMapIterator =
+ ledgerMapHead.entrySet().iterator();
while (entryMapIterator.hasNext()) {
- Long2ObjectMap.Entry<IntIntPair> intIntPairEntry =
entryMapIterator.next();
- long entryId = intIntPairEntry.getLongKey();
+ Map.Entry<Long, IntIntPair> intIntPairEntry =
entryMapIterator.next();
+ long entryId = intIntPairEntry.getKey();
if (!acquiredWriteLock) {
retryWithWriteLock = true;
return;
@@ -391,6 +389,10 @@ public class PendingAcksMap {
pendingAcksRemoveHandler.handleRemoving(consumer,
ledgerId, entryId, stickyKeyHash, closed);
}
entryMapIterator.remove();
+ // also remove from the original map if we're iterating a
copy
+ if (ledgerId == markDeleteLedgerId) {
+ ledgerMap.remove(entryId);
+ }
}
if (ledgerMap.isEmpty()) {
if (!acquiredWriteLock) {
@@ -421,4 +423,4 @@ public class PendingAcksMap {
pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId,
entryId, stickyKeyHash, closed);
}
}
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c2afc35c619..9751a0b2701 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.service.persistent;
import static
org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
import com.google.common.annotations.VisibleForTesting;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -59,6 +57,7 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.IntOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -418,7 +417,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// in replay read mode, keep track of consumers for entries, used for
look-ahead check
Set<Consumer> consumersForEntriesForLookaheadCheck = lookAheadAllowed
? new HashSet<>() : null;
// track already blocked hashes to block any further messages with the
same hash
- IntSet alreadyBlockedHashes = new IntOpenHashSet();
+ IntOpenHashSet alreadyBlockedHashes = new IntOpenHashSet();
for (Entry inputEntry : entries) {
EntryAndMetadata entry;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index c5a564d1b66..e25595072d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -234,7 +234,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
return;
}
try {
- this.delayedMessageMap.firstLongKey();
+ this.delayedMessageMap.firstKey();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
diff --git a/pulsar-client-admin-shaded/build.gradle.kts
b/pulsar-client-admin-shaded/build.gradle.kts
index 39f9f01842e..e1da303e84e 100644
--- a/pulsar-client-admin-shaded/build.gradle.kts
+++ b/pulsar-client-admin-shaded/build.gradle.kts
@@ -22,9 +22,6 @@ plugins {
}
dependencies {
- implementation(project(":pulsar-client-admin-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
- }
- implementation(project(":pulsar-client-dependencies-minimized"))
+ implementation(project(":pulsar-client-admin-original"))
implementation(project(":pulsar-client-messagecrypto-bc"))
}
diff --git a/pulsar-client-all/build.gradle.kts
b/pulsar-client-all/build.gradle.kts
index d7961703409..9b32c779138 100644
--- a/pulsar-client-all/build.gradle.kts
+++ b/pulsar-client-all/build.gradle.kts
@@ -23,10 +23,7 @@ plugins {
dependencies {
implementation(project(":pulsar-client-api"))
- implementation(project(":pulsar-client-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
- }
- implementation(project(":pulsar-client-dependencies-minimized"))
+ implementation(project(":pulsar-client-original"))
implementation(project(":pulsar-client-admin-original"))
implementation(project(":pulsar-client-messagecrypto-bc"))
diff --git a/pulsar-client-dependencies-minimized/build.gradle.kts
b/pulsar-client-dependencies-minimized/build.gradle.kts
deleted file mode 100644
index 282208bc773..00000000000
--- a/pulsar-client-dependencies-minimized/build.gradle.kts
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-plugins {
- id("pulsar.shadow-conventions")
-}
-
-dependencies {
- implementation(project(":pulsar-client-original"))
-}
-
-tasks.shadowJar {
- dependencies {
- include(dependency("it.unimi.dsi:fastutil"))
- }
- // Shadow's minimize() is too aggressive (keeps ~12,885 classes vs the
~590 that are needed).
- // Instead, we whitelist the specific classes used by pulsar-client and
pulsar-broker.
- // The patterns below match Maven shade plugin's minimizeJar output, plus
broker-only classes.
- include("META-INF/**")
-
- // Root package utilities (Hash, HashCommon, Arrays, etc.)
- include("it/unimi/dsi/fastutil/*.class")
-
- // --- longs package ---
- // Long2Object maps (used by NegativeAcksTracker,
InMemoryRedeliveryTracker, PendingAcksMap, etc.)
- include("it/unimi/dsi/fastutil/longs/Long2Object*")
- include("it/unimi/dsi/fastutil/longs/AbstractLong2Object*")
- // Long2Int maps (used by InMemoryRedeliveryTracker)
- include("it/unimi/dsi/fastutil/longs/Long2Int*")
- include("it/unimi/dsi/fastutil/longs/AbstractLong2Int*")
- // Cross-type function interfaces (use exact names to avoid matching
Functions$* inner classes)
- for (t in listOf("Boolean", "Byte", "Char", "Double", "Float", "Int",
"Long", "Object", "Reference", "Short")) {
- include("it/unimi/dsi/fastutil/longs/Long2${t}Function.class")
- }
- // LongOpenHashSet (used by InMemoryDelayedDeliveryTracker)
- include("it/unimi/dsi/fastutil/longs/LongOpenHashSet*")
- // Collection/iterator/utility support classes
- include("it/unimi/dsi/fastutil/longs/AbstractLongCollection.class")
- include("it/unimi/dsi/fastutil/longs/AbstractLongIterator.class")
- include("it/unimi/dsi/fastutil/longs/AbstractLongList*")
- include("it/unimi/dsi/fastutil/longs/AbstractLongSet.class")
- include("it/unimi/dsi/fastutil/longs/AbstractLongSortedSet.class")
- include("it/unimi/dsi/fastutil/longs/AbstractLongSpliterator.class")
- include("it/unimi/dsi/fastutil/longs/LongArrayList*")
- include("it/unimi/dsi/fastutil/longs/LongArraySet*")
- include("it/unimi/dsi/fastutil/longs/LongArrays*")
- include("it/unimi/dsi/fastutil/longs/LongBidirectional*")
- include("it/unimi/dsi/fastutil/longs/LongBigArrays*")
- include("it/unimi/dsi/fastutil/longs/LongBigListIterator*")
- include("it/unimi/dsi/fastutil/longs/LongCollection*")
- include("it/unimi/dsi/fastutil/longs/LongComparator*")
- include("it/unimi/dsi/fastutil/longs/LongConsumer*")
- include("it/unimi/dsi/fastutil/longs/LongImmutableList*")
- include("it/unimi/dsi/fastutil/longs/LongIterable*")
- include("it/unimi/dsi/fastutil/longs/LongIterator*")
- include("it/unimi/dsi/fastutil/longs/LongList*")
- include("it/unimi/dsi/fastutil/longs/LongObjectImmutablePair*")
- include("it/unimi/dsi/fastutil/longs/LongObjectPair*")
- include("it/unimi/dsi/fastutil/longs/LongPredicate*")
- include("it/unimi/dsi/fastutil/longs/LongSet*")
- include("it/unimi/dsi/fastutil/longs/LongSorted*")
- include("it/unimi/dsi/fastutil/longs/LongSpliterator*")
- include("it/unimi/dsi/fastutil/longs/LongStack*")
- include("it/unimi/dsi/fastutil/longs/LongUnaryOperator*")
- include("it/unimi/dsi/fastutil/longs/package-info.class")
-
- // --- objects package ---
- // ObjectBidirectionalIterator (used by PendingAcksMap)
- include("it/unimi/dsi/fastutil/objects/ObjectBidirectionalIterable*")
- include("it/unimi/dsi/fastutil/objects/ObjectBidirectionalIterator*")
- // ObjectIntPair (used by Consumer)
- include("it/unimi/dsi/fastutil/objects/ObjectIntPair.class")
- include("it/unimi/dsi/fastutil/objects/ObjectObjectImmutablePair*")
- // Cross-type function interfaces (use exact names to avoid matching
Functions$*Function inner classes)
- @Suppress("SpellCheckingInspection")
- val objectFunctionTypes = listOf(
- "Boolean", "Byte", "Char", "Double", "Float", "Int", "Long", "Object",
"Reference", "Short",
- )
- for (t in objectFunctionTypes) {
- include("it/unimi/dsi/fastutil/objects/Object2${t}Function.class")
- include("it/unimi/dsi/fastutil/objects/Reference2${t}Function.class")
- }
- // Collection/iterator/utility support classes
- include("it/unimi/dsi/fastutil/objects/AbstractObjectCollection.class")
- include("it/unimi/dsi/fastutil/objects/AbstractObjectIterator.class")
- include("it/unimi/dsi/fastutil/objects/AbstractObjectList*")
- include("it/unimi/dsi/fastutil/objects/AbstractObjectSet.class")
- include("it/unimi/dsi/fastutil/objects/AbstractObjectSortedSet.class")
- include("it/unimi/dsi/fastutil/objects/AbstractObjectSpliterator.class")
- include("it/unimi/dsi/fastutil/objects/ObjectArrayList*")
- include("it/unimi/dsi/fastutil/objects/ObjectArraySet*")
- include("it/unimi/dsi/fastutil/objects/ObjectArrays*")
- include("it/unimi/dsi/fastutil/objects/ObjectBigArrays*")
- include("it/unimi/dsi/fastutil/objects/ObjectBigListIterator.class")
- include("it/unimi/dsi/fastutil/objects/ObjectCollection.class")
- include("it/unimi/dsi/fastutil/objects/ObjectCollections*")
- include("it/unimi/dsi/fastutil/objects/ObjectComparators*")
- include("it/unimi/dsi/fastutil/objects/ObjectImmutableList*")
- include("it/unimi/dsi/fastutil/objects/ObjectIterable.class")
- include("it/unimi/dsi/fastutil/objects/ObjectIterator.class")
- include("it/unimi/dsi/fastutil/objects/ObjectIterators*")
- include("it/unimi/dsi/fastutil/objects/ObjectList.class")
- include("it/unimi/dsi/fastutil/objects/ObjectListIterator.class")
- include("it/unimi/dsi/fastutil/objects/ObjectLists*")
- include("it/unimi/dsi/fastutil/objects/ObjectOpenHashSet*")
- include("it/unimi/dsi/fastutil/objects/ObjectSet.class")
- include("it/unimi/dsi/fastutil/objects/ObjectSets*")
- include("it/unimi/dsi/fastutil/objects/ObjectSortedSet.class")
- include("it/unimi/dsi/fastutil/objects/ObjectSortedSets*")
- include("it/unimi/dsi/fastutil/objects/ObjectSpliterator.class")
- include("it/unimi/dsi/fastutil/objects/ObjectSpliterators*")
- include("it/unimi/dsi/fastutil/objects/package-info.class")
-
- // --- ints package (used by broker: DrainingHashesTracker, Consumer,
PersistentStickyKeyDispatcher) ---
- include("it/unimi/dsi/fastutil/ints/Int2Object*")
- include("it/unimi/dsi/fastutil/ints/AbstractInt2Object*")
- for (t in listOf("Boolean", "Byte", "Char", "Double", "Float", "Int",
"Long", "Object", "Reference", "Short")) {
- include("it/unimi/dsi/fastutil/ints/Int2${t}Function.class")
- }
- include("it/unimi/dsi/fastutil/ints/IntOpenHashSet*")
- include("it/unimi/dsi/fastutil/ints/IntIntPair*")
- include("it/unimi/dsi/fastutil/ints/IntIntImmutablePair*")
- include("it/unimi/dsi/fastutil/ints/IntSet*")
- include("it/unimi/dsi/fastutil/ints/AbstractIntCollection*")
- include("it/unimi/dsi/fastutil/ints/AbstractIntIterator*")
- include("it/unimi/dsi/fastutil/ints/AbstractIntSet*")
- include("it/unimi/dsi/fastutil/ints/AbstractIntSpliterator*")
- include("it/unimi/dsi/fastutil/ints/IntCollection*")
- include("it/unimi/dsi/fastutil/ints/IntIterator*")
- include("it/unimi/dsi/fastutil/ints/IntIterable*")
- include("it/unimi/dsi/fastutil/ints/IntConsumer*")
- include("it/unimi/dsi/fastutil/ints/IntPredicate*")
- include("it/unimi/dsi/fastutil/ints/IntSpliterator*")
- include("it/unimi/dsi/fastutil/ints/IntArrays*")
- include("it/unimi/dsi/fastutil/ints/IntBigArrays*")
- include("it/unimi/dsi/fastutil/ints/IntComparator*")
- include("it/unimi/dsi/fastutil/ints/package-info.class")
-
- // --- io package (transitively referenced) ---
- include("it/unimi/dsi/fastutil/io/**")
-
- // --- other primitive types: only cross-type Function interfaces + basic
utilities ---
- // Use type-prefixed patterns to avoid matching inner classes (e.g.,
*Iterator.class would
- // match AbstractByte2BooleanSortedMap$KeySetIterator.class).
- @Suppress("SpellCheckingInspection")
- val otherTypes = mapOf(
- "booleans" to "Boolean", "bytes" to "Byte", "chars" to "Char",
- "doubles" to "Double", "floats" to "Float", "shorts" to "Short",
- )
- for ((pkg, prefix) in otherTypes) {
- for (t in listOf("Boolean", "Byte", "Char", "Double", "Float", "Int",
"Long", "Object", "Reference", "Short")) {
- include("it/unimi/dsi/fastutil/$pkg/${prefix}2${t}Function.class")
- }
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Arrays*")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}BigArrays*")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Comparator.class")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Comparators*")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Consumer.class")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Iterator.class")
- include("it/unimi/dsi/fastutil/$pkg/${prefix}Spliterator.class")
- include("it/unimi/dsi/fastutil/$pkg/package-info.class")
- }
-}
-
diff --git a/pulsar-client-shaded/build.gradle.kts
b/pulsar-client-shaded/build.gradle.kts
index 7394342f24b..8cc9c60f186 100644
--- a/pulsar-client-shaded/build.gradle.kts
+++ b/pulsar-client-shaded/build.gradle.kts
@@ -23,9 +23,6 @@ plugins {
dependencies {
implementation(project(":pulsar-client-api"))
- implementation(project(":pulsar-client-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
- }
- implementation(project(":pulsar-client-dependencies-minimized"))
+ implementation(project(":pulsar-client-original"))
implementation(project(":pulsar-client-messagecrypto-bc"))
}
diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts
index d19cc182171..2ce1e57112b 100644
--- a/pulsar-client/build.gradle.kts
+++ b/pulsar-client/build.gradle.kts
@@ -58,7 +58,6 @@ dependencies {
implementation(libs.jsr305)
implementation(libs.jspecify)
implementation(libs.roaringbitmap)
- implementation(libs.fastutil)
compileOnly(libs.swagger.annotations)
compileOnly(libs.protobuf.java)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index d975d22be7d..58742c0bdc4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -23,14 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.opentelemetry.api.trace.Span;
-import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
-import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import java.io.Closeable;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -38,6 +35,8 @@ import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.TraceableMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.collections.Long2ObjectMap;
+import org.apache.pulsar.common.util.collections.Long2ObjectOpenHashMap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +46,8 @@ class NegativeAcksTracker implements Closeable {
// timestamp -> ledgerId -> entryId, no need to batch index, if different
messages have
// different timestamp, there will be multiple entries in the map
- // RB Tree -> LongOpenHashMap -> Roaring64Bitmap
- private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
nackedMessages = null;
+ // TreeMap -> LongOpenHashMap -> Roaring64Bitmap
+ private TreeMap<Long, Long2ObjectMap<Roaring64Bitmap>> nackedMessages =
null;
private final Long2ObjectMap<Long2ObjectMap<MessageId>> nackedMessageIds =
new Long2ObjectOpenHashMap<>();
private final ConsumerBase<?> consumer;
@@ -88,9 +87,7 @@ class NegativeAcksTracker implements Closeable {
}
Long2ObjectMap<Roaring64Bitmap> ledgerMap =
nackedMessages.get(timestamp);
- for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
- long ledgerId = ledgerEntry.getLongKey();
- Roaring64Bitmap entrySet = ledgerEntry.getValue();
+ ledgerMap.forEach((ledgerId, entrySet) -> {
entrySet.forEach(entryId -> {
MessageId msgId = null;
Long2ObjectMap<MessageId> entryMap =
nackedMessageIds.get(ledgerId);
@@ -106,13 +103,13 @@ class NegativeAcksTracker implements Closeable {
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
- }
+ });
}
// remove entries from the nackedMessages map
- LongBidirectionalIterator iterator =
nackedMessages.keySet().iterator();
+ Iterator<Long> iterator = nackedMessages.keySet().iterator();
while (iterator.hasNext()) {
- long timestamp = iterator.nextLong();
+ long timestamp = iterator.next();
if (timestamp <= currentTimestamp) {
iterator.remove();
} else {
@@ -122,7 +119,7 @@ class NegativeAcksTracker implements Closeable {
// Schedule the next redelivery if there are still messages to
redeliver
if (!nackedMessages.isEmpty()) {
- long nextTriggerTimestamp = nackedMessages.firstLongKey();
+ long nextTriggerTimestamp = nackedMessages.firstKey();
long delayMs = Math.max(nextTriggerTimestamp -
currentTimestamp, 0);
if (delayMs > 0) {
this.timeout = timer.newTimeout(this::triggerRedelivery,
delayMs, TimeUnit.MILLISECONDS);
@@ -166,7 +163,7 @@ class NegativeAcksTracker implements Closeable {
}
if (nackedMessages == null) {
- nackedMessages = new Long2ObjectAVLTreeMap<>();
+ nackedMessages = new TreeMap<>();
}
long backoffMs;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java
new file mode 100644
index 00000000000..ada501319d7
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMap.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+/**
+ * Open-addressing hash map with primitive int keys and object values.
+ * Uses linear probing and fibonacci hashing.
+ * Not thread-safe.
+ */
+@SuppressWarnings("unchecked")
+public class Int2ObjectOpenHashMap<V> {
+
+ private static final float LOAD_FACTOR = 0.75f;
+ private static final int MIN_CAPACITY = 16;
+
+ private int[] keys;
+ private Object[] values;
+ private boolean[] used;
+ private int size;
+ private int capacity;
+ private int threshold;
+
+ public Int2ObjectOpenHashMap() {
+ this(MIN_CAPACITY);
+ }
+
+ public Int2ObjectOpenHashMap(int expectedItems) {
+ int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems /
LOAD_FACTOR) + 1));
+ keys = new int[cap];
+ values = new Object[cap];
+ used = new boolean[cap];
+ capacity = cap;
+ threshold = (int) (cap * LOAD_FACTOR);
+ }
+
+ public V get(int key) {
+ int idx = indexOf(key);
+ return idx >= 0 ? (V) values[idx] : null;
+ }
+
+ public V put(int key, V value) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ V old = (V) values[idx];
+ values[idx] = value;
+ return old;
+ }
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ }
+ insertNew(key, value);
+ return null;
+ }
+
+ public V remove(int key) {
+ int idx = indexOf(key);
+ if (idx < 0) {
+ return null;
+ }
+ V old = (V) values[idx];
+ removeAt(idx);
+ return old;
+ }
+
+ /**
+ * Remove the entry only if it maps to the given value (by reference
equality).
+ *
+ * @return true if the entry was removed
+ */
+ public boolean remove(int key, Object value) {
+ int idx = indexOf(key);
+ if (idx < 0 || values[idx] != value) {
+ return false;
+ }
+ removeAt(idx);
+ return true;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public void clear() {
+ if (size > 0) {
+ java.util.Arrays.fill(used, false);
+ java.util.Arrays.fill(values, null);
+ size = 0;
+ }
+ }
+
+ private int indexOf(int key) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ return -1;
+ }
+ if (keys[idx] == key) {
+ return idx;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ private void insertNew(int key, V value) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (used[idx]) {
+ idx = (idx + 1) & mask;
+ }
+ keys[idx] = key;
+ values[idx] = value;
+ used[idx] = true;
+ size++;
+ }
+
+ private void removeAt(int idx) {
+ int mask = capacity - 1;
+ values[idx] = null;
+ size--;
+ int next = (idx + 1) & mask;
+ while (used[next]) {
+ int naturalSlot = hash(keys[next]) & mask;
+ if ((next > idx && (naturalSlot <= idx || naturalSlot > next))
+ || (next < idx && (naturalSlot <= idx && naturalSlot >
next))) {
+ keys[idx] = keys[next];
+ values[idx] = values[next];
+ values[next] = null;
+ idx = next;
+ }
+ next = (next + 1) & mask;
+ }
+ used[idx] = false;
+ }
+
+ private void rehash(int newCapacity) {
+ int[] oldKeys = keys;
+ Object[] oldValues = values;
+ boolean[] oldUsed = used;
+ int oldCapacity = capacity;
+
+ capacity = newCapacity;
+ keys = new int[newCapacity];
+ values = new Object[newCapacity];
+ used = new boolean[newCapacity];
+ threshold = (int) (newCapacity * LOAD_FACTOR);
+ size = 0;
+
+ for (int i = 0; i < oldCapacity; i++) {
+ if (oldUsed[i]) {
+ insertNew(oldKeys[i], (V) oldValues[i]);
+ }
+ }
+ }
+
+ /**
+ * Fibonacci hashing for int keys.
+ */
+ private static int hash(int key) {
+ int h = key * 0x9E3779B9;
+ return h ^ (h >>> 16);
+ }
+
+ private static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1;
+ }
+}
diff --git a/pulsar-client-admin-shaded/build.gradle.kts
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java
similarity index 71%
copy from pulsar-client-admin-shaded/build.gradle.kts
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java
index 39f9f01842e..ee6540164c0 100644
--- a/pulsar-client-admin-shaded/build.gradle.kts
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntIntPair.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.common.util.collections;
-plugins {
- id("pulsar.client-shade-conventions")
-}
-
-dependencies {
- implementation(project(":pulsar-client-admin-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
+/**
+ * An immutable pair of int values.
+ */
+public record IntIntPair(int leftInt, int rightInt) {
+ public static IntIntPair of(int left, int right) {
+ return new IntIntPair(left, right);
}
- implementation(project(":pulsar-client-dependencies-minimized"))
- implementation(project(":pulsar-client-messagecrypto-bc"))
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java
new file mode 100644
index 00000000000..be7ffd8076e
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/IntOpenHashSet.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+/**
+ * Open-addressing hash set for primitive int values.
+ * Not thread-safe.
+ */
+public class IntOpenHashSet {
+
+ private static final float LOAD_FACTOR = 0.75f;
+ private static final int MIN_CAPACITY = 16;
+
+ private int[] keys;
+ private boolean[] used;
+ private int size;
+ private int capacity;
+ private int threshold;
+
+ public IntOpenHashSet() {
+ this(MIN_CAPACITY);
+ }
+
+ public IntOpenHashSet(int expectedItems) {
+ int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems /
LOAD_FACTOR) + 1));
+ keys = new int[cap];
+ used = new boolean[cap];
+ capacity = cap;
+ threshold = (int) (cap * LOAD_FACTOR);
+ }
+
+ public boolean add(int key) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ return add(key);
+ }
+ keys[idx] = key;
+ used[idx] = true;
+ size++;
+ return true;
+ }
+ if (keys[idx] == key) {
+ return false;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ public boolean contains(int key) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ return false;
+ }
+ if (keys[idx] == key) {
+ return true;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ private void rehash(int newCapacity) {
+ int[] oldKeys = keys;
+ boolean[] oldUsed = used;
+ int oldCapacity = capacity;
+
+ capacity = newCapacity;
+ keys = new int[newCapacity];
+ used = new boolean[newCapacity];
+ threshold = (int) (newCapacity * LOAD_FACTOR);
+ size = 0;
+
+ for (int i = 0; i < oldCapacity; i++) {
+ if (oldUsed[i]) {
+ add(oldKeys[i]);
+ }
+ }
+ }
+
+ private static int hash(int key) {
+ int h = key * 0x9E3779B9;
+ return h ^ (h >>> 16);
+ }
+
+ private static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java
new file mode 100644
index 00000000000..10b2e30b9e4
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntMap.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.function.LongToIntFunction;
+
+/**
+ * A map with primitive {@code long} keys and primitive {@code int} values.
+ *
+ * <p>The default return value for missing keys is {@code 0}.
+ */
+public interface Long2IntMap {
+
+ /**
+ * Returns the value for the given key, or {@code 0} if not present.
+ *
+ * @param key the key
+ * @return the mapped value, or {@code 0}
+ */
+ int get(long key);
+
+ /**
+ * Associates the given value with the given key.
+ *
+ * @param key the key
+ * @param value the value
+ * @return the previous value, or {@code 0} if there was no mapping
+ */
+ int put(long key, int value);
+
+ /**
+ * Removes the mapping for the given key.
+ *
+ * @param key the key
+ * @return the previous value, or {@code 0} if there was no mapping
+ */
+ int remove(long key);
+
+ /**
+ * Returns the value for the given key, or the specified default if not
present.
+ *
+ * @param key the key
+ * @param defaultValue the default value to return if the key is absent
+ * @return the mapped value, or {@code defaultValue}
+ */
+ int getOrDefault(long key, int defaultValue);
+
+ /**
+ * If the key is not already present, computes its value using the given
function
+ * and inserts it.
+ *
+ * @param key the key
+ * @param mappingFunction the function to compute a value
+ * @return the current (existing or computed) value
+ */
+ int computeIfAbsent(long key, LongToIntFunction mappingFunction);
+
+ /**
+ * Returns {@code true} if this map contains no entries.
+ */
+ boolean isEmpty();
+
+ /**
+ * Removes all entries from this map.
+ */
+ void clear();
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java
new file mode 100644
index 00000000000..22e876dc683
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMap.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.function.LongToIntFunction;
+
+/**
+ * Open-addressing hash map with primitive long keys and primitive int values.
+ * Uses linear probing and fibonacci hashing. Returns 0 for missing keys.
+ * Not thread-safe.
+ */
+public class Long2IntOpenHashMap implements Long2IntMap {
+
+ private static final float LOAD_FACTOR = 0.75f;
+ private static final int MIN_CAPACITY = 16;
+
+ private long[] keys;
+ private int[] values;
+ private boolean[] used;
+ private int size;
+ private int capacity;
+ private int threshold;
+
+ public Long2IntOpenHashMap() {
+ this(MIN_CAPACITY);
+ }
+
+ public Long2IntOpenHashMap(int expectedItems) {
+ int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems /
LOAD_FACTOR) + 1));
+ keys = new long[cap];
+ values = new int[cap];
+ used = new boolean[cap];
+ capacity = cap;
+ threshold = (int) (cap * LOAD_FACTOR);
+ }
+
+ @Override
+ public int get(long key) {
+ int idx = indexOf(key);
+ return idx >= 0 ? values[idx] : 0;
+ }
+
+ @Override
+ public int put(long key, int value) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ int old = values[idx];
+ values[idx] = value;
+ return old;
+ }
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ }
+ insertNew(key, value);
+ return 0;
+ }
+
+ @Override
+ public int remove(long key) {
+ int idx = indexOf(key);
+ if (idx < 0) {
+ return 0;
+ }
+ int old = values[idx];
+ removeAt(idx);
+ return old;
+ }
+
+ @Override
+ public int getOrDefault(long key, int defaultValue) {
+ int idx = indexOf(key);
+ return idx >= 0 ? values[idx] : defaultValue;
+ }
+
+ @Override
+ public int computeIfAbsent(long key, LongToIntFunction mappingFunction) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ return values[idx];
+ }
+ int value = mappingFunction.applyAsInt(key);
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ }
+ insertNew(key, value);
+ return value;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @Override
+ public void clear() {
+ if (size > 0) {
+ java.util.Arrays.fill(used, false);
+ size = 0;
+ }
+ }
+
+ private int indexOf(long key) {
+ int mask = capacity - 1;
+ int idx = Long2ObjectOpenHashMap.hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ return -1;
+ }
+ if (keys[idx] == key) {
+ return idx;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ private void insertNew(long key, int value) {
+ int mask = capacity - 1;
+ int idx = Long2ObjectOpenHashMap.hash(key) & mask;
+ while (used[idx]) {
+ idx = (idx + 1) & mask;
+ }
+ keys[idx] = key;
+ values[idx] = value;
+ used[idx] = true;
+ size++;
+ }
+
+ private void removeAt(int idx) {
+ int mask = capacity - 1;
+ size--;
+ int next = (idx + 1) & mask;
+ while (used[next]) {
+ int naturalSlot = Long2ObjectOpenHashMap.hash(keys[next]) & mask;
+ if ((next > idx && (naturalSlot <= idx || naturalSlot > next))
+ || (next < idx && (naturalSlot <= idx && naturalSlot >
next))) {
+ keys[idx] = keys[next];
+ values[idx] = values[next];
+ idx = next;
+ }
+ next = (next + 1) & mask;
+ }
+ used[idx] = false;
+ }
+
+ private void rehash(int newCapacity) {
+ long[] oldKeys = keys;
+ int[] oldValues = values;
+ boolean[] oldUsed = used;
+ int oldCapacity = capacity;
+
+ capacity = newCapacity;
+ keys = new long[newCapacity];
+ values = new int[newCapacity];
+ used = new boolean[newCapacity];
+ threshold = (int) (newCapacity * LOAD_FACTOR);
+ size = 0;
+
+ for (int i = 0; i < oldCapacity; i++) {
+ if (oldUsed[i]) {
+ insertNew(oldKeys[i], oldValues[i]);
+ }
+ }
+ }
+
+ private static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1;
+ }
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java
new file mode 100644
index 00000000000..56891027d46
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectMap.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.Collection;
+import java.util.function.LongFunction;
+
+/**
+ * A map with primitive {@code long} keys and object values.
+ *
+ * <p>Using primitive keys avoids the overhead of boxing {@code long} values
into
+ * {@link Long} objects, reducing both memory usage and GC pressure compared to
+ * {@code Map<Long, V>}.
+ *
+ * @param <V> the type of mapped values
+ */
+public interface Long2ObjectMap<V> {
+
+ /**
+ * Returns the value associated with the given key, or {@code null} if not
present.
+ *
+ * @param key the key
+ * @return the value, or {@code null}
+ */
+ V get(long key);
+
+ /**
+ * Associates the given value with the given key.
+ *
+ * @param key the key
+ * @param value the value
+ * @return the previous value, or {@code null} if there was no mapping
+ */
+ V put(long key, V value);
+
+ /**
+ * Removes the mapping for the given key.
+ *
+ * @param key the key
+ * @return the previous value, or {@code null} if there was no mapping
+ */
+ V remove(long key);
+
+ /**
+ * If the key is not already present, computes its value using the given
function
+ * and inserts it.
+ *
+ * @param key the key
+ * @param mappingFunction the function to compute a value
+ * @return the current (existing or computed) value
+ */
+ V computeIfAbsent(long key, LongFunction<? extends V> mappingFunction);
+
+ /**
+ * Returns {@code true} if this map contains no entries.
+ */
+ boolean isEmpty();
+
+ /**
+ * Returns the number of entries in this map.
+ */
+ int size();
+
+ /**
+ * Removes all entries from this map.
+ */
+ void clear();
+
+ /**
+ * Iterates over all entries, calling the consumer with primitive long keys
+ * (no boxing).
+ *
+ * @param consumer the consumer to call for each entry
+ */
+ void forEach(LongObjConsumer<? super V> consumer);
+
+ /**
+ * Returns a {@link Collection} view of the values in this map.
+ * The collection supports iteration and {@code stream()} operations.
+ */
+ Collection<V> values();
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java
new file mode 100644
index 00000000000..70bc888bc84
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMap.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.LongFunction;
+
+/**
+ * Open-addressing hash map with primitive long keys and object values.
+ * Uses linear probing and fibonacci hashing for good distribution.
+ * Not thread-safe.
+ */
+@SuppressWarnings("unchecked")
+public class Long2ObjectOpenHashMap<V> implements Long2ObjectMap<V> {
+
+ private static final float LOAD_FACTOR = 0.75f;
+ private static final int MIN_CAPACITY = 16;
+
+ private long[] keys;
+ private Object[] values;
+ private boolean[] used;
+ private int size;
+ private int capacity;
+ private int threshold;
+
+ /**
+ * Creates a new map with default capacity.
+ */
+ public Long2ObjectOpenHashMap() {
+ this(MIN_CAPACITY);
+ }
+
+ /**
+ * Creates a new map sized to hold the expected number of items without
rehashing.
+ *
+ * @param expectedItems the expected number of items
+ */
+ public Long2ObjectOpenHashMap(int expectedItems) {
+ int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems /
LOAD_FACTOR) + 1));
+ keys = new long[cap];
+ values = new Object[cap];
+ used = new boolean[cap];
+ capacity = cap;
+ threshold = (int) (cap * LOAD_FACTOR);
+ }
+
+ @Override
+ public V get(long key) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ return (V) values[idx];
+ }
+ return null;
+ }
+
+ @Override
+ public V put(long key, V value) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ V old = (V) values[idx];
+ values[idx] = value;
+ return old;
+ }
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ }
+ insertNew(key, value);
+ return null;
+ }
+
+ @Override
+ public V remove(long key) {
+ int idx = indexOf(key);
+ if (idx < 0) {
+ return null;
+ }
+ V old = (V) values[idx];
+ removeAt(idx);
+ return old;
+ }
+
+ @Override
+ public V computeIfAbsent(long key, LongFunction<? extends V>
mappingFunction) {
+ int idx = indexOf(key);
+ if (idx >= 0) {
+ return (V) values[idx];
+ }
+ V value = mappingFunction.apply(key);
+ if (value != null) {
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ }
+ insertNew(key, value);
+ }
+ return value;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public void clear() {
+ if (size > 0) {
+ java.util.Arrays.fill(used, false);
+ java.util.Arrays.fill(values, null);
+ size = 0;
+ }
+ }
+
+ @Override
+ public void forEach(LongObjConsumer<? super V> consumer) {
+ for (int i = 0; i < capacity; i++) {
+ if (used[i]) {
+ consumer.accept(keys[i], (V) values[i]);
+ }
+ }
+ }
+
+ @Override
+ public Collection<V> values() {
+ return new ValuesCollection();
+ }
+
+ /**
+ * Find the index of the given key, or -1 if not present.
+ */
+ private int indexOf(long key) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ return -1;
+ }
+ if (keys[idx] == key) {
+ return idx;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ private void insertNew(long key, V value) {
+ int mask = capacity - 1;
+ int idx = hash(key) & mask;
+ while (used[idx]) {
+ idx = (idx + 1) & mask;
+ }
+ keys[idx] = key;
+ values[idx] = value;
+ used[idx] = true;
+ size++;
+ }
+
+ /**
+ * Remove the entry at the given index using backward-shift deletion
+ * (no tombstones needed).
+ */
+ private void removeAt(int idx) {
+ int mask = capacity - 1;
+ values[idx] = null;
+ size--;
+ // Shift back entries that may have been displaced by the removed entry
+ int next = (idx + 1) & mask;
+ while (used[next]) {
+ int naturalSlot = hash(keys[next]) & mask;
+ // Check if 'next' is displaced past 'idx' (wrapping around)
+ if ((next > idx && (naturalSlot <= idx || naturalSlot > next))
+ || (next < idx && (naturalSlot <= idx && naturalSlot >
next))) {
+ keys[idx] = keys[next];
+ values[idx] = values[next];
+ values[next] = null;
+ idx = next;
+ }
+ next = (next + 1) & mask;
+ }
+ used[idx] = false;
+ }
+
+ private void rehash(int newCapacity) {
+ long[] oldKeys = keys;
+ Object[] oldValues = values;
+ boolean[] oldUsed = used;
+ int oldCapacity = capacity;
+
+ capacity = newCapacity;
+ keys = new long[newCapacity];
+ values = new Object[newCapacity];
+ used = new boolean[newCapacity];
+ threshold = (int) (newCapacity * LOAD_FACTOR);
+ size = 0;
+
+ for (int i = 0; i < oldCapacity; i++) {
+ if (oldUsed[i]) {
+ insertNew(oldKeys[i], (V) oldValues[i]);
+ }
+ }
+ }
+
+ /**
+ * Fibonacci hashing for long keys.
+ */
+ static int hash(long key) {
+ long h = key * 0x9E3779B97F4A7C15L;
+ return (int) (h ^ (h >>> 32));
+ }
+
+ private static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1;
+ }
+
+ private class ValuesCollection extends AbstractCollection<V> {
+ @Override
+ public Iterator<V> iterator() {
+ return new Iterator<>() {
+ private int idx = findNext(0);
+
+ @Override
+ public boolean hasNext() {
+ return idx < capacity;
+ }
+
+ @Override
+ public V next() {
+ if (idx >= capacity) {
+ throw new NoSuchElementException();
+ }
+ V val = (V) values[idx];
+ idx = findNext(idx + 1);
+ return val;
+ }
+
+ private int findNext(int from) {
+ while (from < capacity && !used[from]) {
+ from++;
+ }
+ return from;
+ }
+ };
+ }
+
+ @Override
+ public int size() {
+ return Long2ObjectOpenHashMap.this.size;
+ }
+ }
+}
diff --git a/pulsar-client-admin-shaded/build.gradle.kts
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java
similarity index 70%
copy from pulsar-client-admin-shaded/build.gradle.kts
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java
index 39f9f01842e..a3dd6108e51 100644
--- a/pulsar-client-admin-shaded/build.gradle.kts
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongObjConsumer.java
@@ -16,15 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.common.util.collections;
-plugins {
- id("pulsar.client-shade-conventions")
-}
-
-dependencies {
- implementation(project(":pulsar-client-admin-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
- }
- implementation(project(":pulsar-client-dependencies-minimized"))
- implementation(project(":pulsar-client-messagecrypto-bc"))
+/**
+ * A consumer that accepts a primitive long key and an object value.
+ */
+@FunctionalInterface
+public interface LongObjConsumer<V> {
+ void accept(long key, V value);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java
new file mode 100644
index 00000000000..48f3720dff7
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongOpenHashSet.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.LongConsumer;
+
+/**
+ * Open-addressing hash set for primitive long values.
+ * Not thread-safe.
+ */
+public class LongOpenHashSet implements Iterable<Long> {
+
+ private static final float LOAD_FACTOR = 0.75f;
+ private static final int MIN_CAPACITY = 16;
+
+ private long[] keys;
+ private boolean[] used;
+ private int size;
+ private int capacity;
+ private int threshold;
+
+ public LongOpenHashSet() {
+ this(MIN_CAPACITY);
+ }
+
+ public LongOpenHashSet(int expectedItems) {
+ int cap = tableSizeFor(Math.max(MIN_CAPACITY, (int) (expectedItems /
LOAD_FACTOR) + 1));
+ keys = new long[cap];
+ used = new boolean[cap];
+ capacity = cap;
+ threshold = (int) (cap * LOAD_FACTOR);
+ }
+
+ public boolean add(long key) {
+ int mask = capacity - 1;
+ int idx = Long2ObjectOpenHashMap.hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ if (size >= threshold) {
+ rehash(capacity * 2);
+ return add(key);
+ }
+ keys[idx] = key;
+ used[idx] = true;
+ size++;
+ return true;
+ }
+ if (keys[idx] == key) {
+ return false;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ public boolean contains(long key) {
+ int mask = capacity - 1;
+ int idx = Long2ObjectOpenHashMap.hash(key) & mask;
+ while (true) {
+ if (!used[idx]) {
+ return false;
+ }
+ if (keys[idx] == key) {
+ return true;
+ }
+ idx = (idx + 1) & mask;
+ }
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public void forEach(LongConsumer consumer) {
+ for (int i = 0; i < capacity; i++) {
+ if (used[i]) {
+ consumer.accept(keys[i]);
+ }
+ }
+ }
+
+ @Override
+ public Iterator<Long> iterator() {
+ return new Iterator<>() {
+ private int idx = findNext(0);
+
+ @Override
+ public boolean hasNext() {
+ return idx < capacity;
+ }
+
+ @Override
+ public Long next() {
+ if (idx >= capacity) {
+ throw new NoSuchElementException();
+ }
+ long val = keys[idx];
+ idx = findNext(idx + 1);
+ return val;
+ }
+
+ private int findNext(int from) {
+ while (from < capacity && !used[from]) {
+ from++;
+ }
+ return from;
+ }
+ };
+ }
+
+ private void rehash(int newCapacity) {
+ long[] oldKeys = keys;
+ boolean[] oldUsed = used;
+ int oldCapacity = capacity;
+
+ capacity = newCapacity;
+ keys = new long[newCapacity];
+ used = new boolean[newCapacity];
+ threshold = (int) (newCapacity * LOAD_FACTOR);
+ size = 0;
+
+ for (int i = 0; i < oldCapacity; i++) {
+ if (oldUsed[i]) {
+ add(oldKeys[i]);
+ }
+ }
+ }
+
+ private static int tableSizeFor(int cap) {
+ int n = cap - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < MIN_CAPACITY) ? MIN_CAPACITY : n + 1;
+ }
+}
diff --git a/pulsar-client-admin-shaded/build.gradle.kts
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java
similarity index 71%
copy from pulsar-client-admin-shaded/build.gradle.kts
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java
index 39f9f01842e..8134afb091e 100644
--- a/pulsar-client-admin-shaded/build.gradle.kts
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ObjectIntPair.java
@@ -16,15 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.common.util.collections;
-plugins {
- id("pulsar.client-shade-conventions")
-}
-
-dependencies {
- implementation(project(":pulsar-client-admin-original")) {
- exclude(group = "it.unimi.dsi", module = "fastutil")
+/**
+ * An immutable pair of an object and an int value.
+ */
+public record ObjectIntPair<T>(T left, int rightInt) {
+ public static <T> ObjectIntPair<T> of(T left, int right) {
+ return new ObjectIntPair<>(left, right);
}
- implementation(project(":pulsar-client-dependencies-minimized"))
- implementation(project(":pulsar-client-messagecrypto-bc"))
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java
new file mode 100644
index 00000000000..2558e6af38d
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/HashSetTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.testng.annotations.Test;
+
+public class HashSetTest {
+
+ @Test
+ public void testLongOpenHashSetBasic() {
+ LongOpenHashSet set = new LongOpenHashSet();
+ assertTrue(set.isEmpty());
+ assertTrue(set.add(1));
+ assertTrue(set.add(2));
+ assertFalse(set.add(1)); // duplicate
+ assertEquals(set.size(), 2);
+ assertTrue(set.contains(1));
+ assertTrue(set.contains(2));
+ assertFalse(set.contains(3));
+ }
+
+ @Test
+ public void testLongOpenHashSetIterable() {
+ LongOpenHashSet set = new LongOpenHashSet();
+ set.add(3);
+ set.add(1);
+ set.add(2);
+ List<Long> values = new ArrayList<>();
+ for (long v : set) {
+ values.add(v);
+ }
+ Collections.sort(values);
+ assertEquals(values, List.of(1L, 2L, 3L));
+ }
+
+ @Test
+ public void testLongOpenHashSetForEach() {
+ LongOpenHashSet set = new LongOpenHashSet();
+ set.add(10);
+ set.add(20);
+ List<Long> values = new ArrayList<>();
+ set.forEach((long v) -> values.add(v));
+ Collections.sort(values);
+ assertEquals(values, List.of(10L, 20L));
+ }
+
+ @Test
+ public void testLongOpenHashSetRehash() {
+ LongOpenHashSet set = new LongOpenHashSet(4);
+ for (int i = 0; i < 100; i++) {
+ set.add(i);
+ }
+ assertEquals(set.size(), 100);
+ for (int i = 0; i < 100; i++) {
+ assertTrue(set.contains(i));
+ }
+ }
+
+ @Test
+ public void testIntOpenHashSetBasic() {
+ IntOpenHashSet set = new IntOpenHashSet();
+ assertTrue(set.isEmpty());
+ assertTrue(set.add(1));
+ assertTrue(set.add(2));
+ assertFalse(set.add(1)); // duplicate
+ assertEquals(set.size(), 2);
+ assertTrue(set.contains(1));
+ assertTrue(set.contains(2));
+ assertFalse(set.contains(3));
+ }
+
+ @Test
+ public void testIntOpenHashSetRehash() {
+ IntOpenHashSet set = new IntOpenHashSet(4);
+ for (int i = 0; i < 100; i++) {
+ set.add(i);
+ }
+ assertEquals(set.size(), 100);
+ for (int i = 0; i < 100; i++) {
+ assertTrue(set.contains(i));
+ }
+ }
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java
new file mode 100644
index 00000000000..75c9e4d3d4b
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Int2ObjectOpenHashMapTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class Int2ObjectOpenHashMapTest {
+
+ @Test
+ public void testEmpty() {
+ Int2ObjectOpenHashMap<String> map = new Int2ObjectOpenHashMap<>();
+ assertTrue(map.isEmpty());
+ assertNull(map.get(1));
+ }
+
+ @Test
+ public void testPutGet() {
+ Int2ObjectOpenHashMap<String> map = new Int2ObjectOpenHashMap<>();
+ assertNull(map.put(1, "one"));
+ assertNull(map.put(2, "two"));
+ assertEquals(map.get(1), "one");
+ assertEquals(map.get(2), "two");
+ assertNull(map.get(3));
+ assertEquals(map.size(), 2);
+ }
+
+ @Test
+ public void testRemove() {
+ Int2ObjectOpenHashMap<String> map = new Int2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ assertEquals(map.remove(1), "one");
+ assertNull(map.get(1));
+ assertEquals(map.size(), 1);
+ }
+
+ @Test
+ public void testRemoveConditional() {
+ Int2ObjectOpenHashMap<String> map = new Int2ObjectOpenHashMap<>();
+ String val = "one";
+ map.put(1, val);
+ assertFalse(map.remove(1, "other")); // different ref
+ assertTrue(map.remove(1, val));
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testClear() {
+ Int2ObjectOpenHashMap<String> map = new Int2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.clear();
+ assertTrue(map.isEmpty());
+ assertNull(map.get(1));
+ }
+
+ @Test
+ public void testRehash() {
+ Int2ObjectOpenHashMap<Integer> map = new Int2ObjectOpenHashMap<>(4);
+ for (int i = 0; i < 100; i++) {
+ map.put(i, i);
+ }
+ assertEquals(map.size(), 100);
+ for (int i = 0; i < 100; i++) {
+ assertEquals(map.get(i), Integer.valueOf(i));
+ }
+ }
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java
new file mode 100644
index 00000000000..7ced0265252
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2IntOpenHashMapTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.Test;
+
+public class Long2IntOpenHashMapTest {
+
+ @Test
+ public void testEmpty() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ assertTrue(map.isEmpty());
+ assertEquals(map.get(0), 0);
+ assertEquals(map.get(1), 0);
+ }
+
+ @Test
+ public void testPutGet() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ assertEquals(map.put(1, 10), 0);
+ assertEquals(map.put(2, 20), 0);
+ assertFalse(map.isEmpty());
+ assertEquals(map.get(1), 10);
+ assertEquals(map.get(2), 20);
+ assertEquals(map.get(3), 0); // default
+ }
+
+ @Test
+ public void testPutReplace() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ map.put(1, 10);
+ assertEquals(map.put(1, 100), 10);
+ assertEquals(map.get(1), 100);
+ }
+
+ @Test
+ public void testRemove() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ map.put(1, 10);
+ map.put(2, 20);
+ assertEquals(map.remove(1), 10);
+ assertEquals(map.get(1), 0); // default after removal
+ assertEquals(map.remove(99), 0); // not present
+ }
+
+ @Test
+ public void testGetOrDefault() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ map.put(1, 10);
+ assertEquals(map.getOrDefault(1, -1), 10);
+ assertEquals(map.getOrDefault(2, -1), -1);
+ }
+
+ @Test
+ public void testComputeIfAbsent() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ assertEquals(map.computeIfAbsent(1, k -> 10), 10);
+ assertEquals(map.computeIfAbsent(1, k -> 99), 10);
+ }
+
+ @Test
+ public void testClear() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap();
+ map.put(1, 10);
+ map.put(2, 20);
+ map.clear();
+ assertTrue(map.isEmpty());
+ assertEquals(map.get(1), 0);
+ }
+
+ @Test
+ public void testRehash() {
+ Long2IntOpenHashMap map = new Long2IntOpenHashMap(4);
+ for (int i = 0; i < 100; i++) {
+ map.put(i, i * 10);
+ }
+ for (int i = 0; i < 100; i++) {
+ assertEquals(map.get(i), i * 10);
+ }
+ }
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java
new file mode 100644
index 00000000000..6e33a5b5315
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/Long2ObjectOpenHashMapTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.testng.annotations.Test;
+
+public class Long2ObjectOpenHashMapTest {
+
+ @Test
+ public void testEmpty() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ assertTrue(map.isEmpty());
+ assertEquals(map.size(), 0);
+ assertNull(map.get(0));
+ assertNull(map.get(1));
+ assertNull(map.remove(1));
+ }
+
+ @Test
+ public void testPutGet() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ assertNull(map.put(1, "one"));
+ assertNull(map.put(2, "two"));
+ assertNull(map.put(3, "three"));
+ assertEquals(map.size(), 3);
+ assertFalse(map.isEmpty());
+ assertEquals(map.get(1), "one");
+ assertEquals(map.get(2), "two");
+ assertEquals(map.get(3), "three");
+ assertNull(map.get(4));
+ }
+
+ @Test
+ public void testPutReplace() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ assertNull(map.put(1, "one"));
+ assertEquals(map.put(1, "ONE"), "one");
+ assertEquals(map.size(), 1);
+ assertEquals(map.get(1), "ONE");
+ }
+
+ @Test
+ public void testRemove() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.put(3, "three");
+ assertEquals(map.remove(2), "two");
+ assertEquals(map.size(), 2);
+ assertNull(map.get(2));
+ assertEquals(map.get(1), "one");
+ assertEquals(map.get(3), "three");
+ }
+
+ @Test
+ public void testComputeIfAbsent() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ assertEquals(map.computeIfAbsent(1, k -> "one"), "one");
+ assertEquals(map.size(), 1);
+ assertEquals(map.computeIfAbsent(1, k -> "other"), "one");
+ assertEquals(map.size(), 1);
+ }
+
+ @Test
+ public void testClear() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.clear();
+ assertTrue(map.isEmpty());
+ assertEquals(map.size(), 0);
+ assertNull(map.get(1));
+ }
+
+ @Test
+ public void testForEach() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.put(3, "three");
+ Map<Long, String> collected = new HashMap<>();
+ map.forEach(collected::put);
+ assertEquals(collected.size(), 3);
+ assertEquals(collected.get(1L), "one");
+ assertEquals(collected.get(2L), "two");
+ assertEquals(collected.get(3L), "three");
+ }
+
+ @Test
+ public void testValues() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.put(3, "three");
+ List<String> values = new ArrayList<>(map.values());
+ assertEquals(values.size(), 3);
+ assertTrue(values.contains("one"));
+ assertTrue(values.contains("two"));
+ assertTrue(values.contains("three"));
+ }
+
+ @Test
+ public void testRehash() {
+ Long2ObjectOpenHashMap<Integer> map = new Long2ObjectOpenHashMap<>(4);
+ for (int i = 0; i < 100; i++) {
+ map.put(i, i);
+ }
+ assertEquals(map.size(), 100);
+ for (int i = 0; i < 100; i++) {
+ assertEquals(map.get(i), Integer.valueOf(i));
+ }
+ }
+
+ @Test
+ public void testZeroKey() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(0, "zero");
+ assertEquals(map.get(0), "zero");
+ assertEquals(map.size(), 1);
+ assertEquals(map.remove(0), "zero");
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testNegativeKeys() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ map.put(-1, "neg1");
+ map.put(Long.MIN_VALUE, "min");
+ assertEquals(map.get(-1), "neg1");
+ assertEquals(map.get(Long.MIN_VALUE), "min");
+ }
+
+ @Test
+ public void testRemoveAndReinsert() {
+ Long2ObjectOpenHashMap<String> map = new Long2ObjectOpenHashMap<>();
+ for (int i = 0; i < 50; i++) {
+ map.put(i, "v" + i);
+ }
+ for (int i = 0; i < 25; i++) {
+ map.remove(i);
+ }
+ assertEquals(map.size(), 25);
+ for (int i = 25; i < 50; i++) {
+ assertEquals(map.get(i), "v" + i);
+ }
+ // Reinsert
+ for (int i = 0; i < 25; i++) {
+ map.put(i, "new" + i);
+ }
+ assertEquals(map.size(), 50);
+ for (int i = 0; i < 25; i++) {
+ assertEquals(map.get(i), "new" + i);
+ }
+ }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index b73f77212ee..647fde294c9 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -177,9 +177,6 @@ include("pulsar-broker-auth-oidc")
include("pulsar-broker-auth-sasl")
include("pulsar-client-auth-sasl")
-// Tier 9 — shaded utility modules (in core-modules)
-include("pulsar-client-dependencies-minimized")
-
// Tier 10 — shaded client modules (in core-modules)
include("pulsar-client-shaded")
include("pulsar-client-all")