This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c78f832753d [improve][misc] Migrate Yahoo DataSketches to Apache
DataSketches (#25965)
c78f832753d is described below
commit c78f832753d913d859faacb37f3caf2dea53ecda
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 8 14:52:18 2026 +0300
[improve][misc] Migrate Yahoo DataSketches to Apache DataSketches (#25965)
---
.../pulsar.client-shade-conventions.gradle.kts | 4 +--
distribution/server/src/assemble/LICENSE.bin.txt | 4 +--
distribution/shell/src/assemble/LICENSE.bin.txt | 4 +--
gradle/libs.versions.toml | 9 ++---
.../build.gradle.kts | 2 +-
.../bookkeeper/DataSketchesOpStatsLogger.java | 33 ++++++++----------
pulsar-broker/build.gradle.kts | 2 +-
.../metrics/DataSketchesOpStatsLogger.java | 20 +++++------
.../metrics/DataSketchesSummaryLogger.java | 14 ++++----
.../prometheus/metrics/ThreadLocalAccessor.java | 16 ++++-----
.../metrics/ThreadLocalAccessorTest.java | 14 ++++----
pulsar-client/build.gradle.kts | 2 +-
.../client/impl/ProducerStatsRecorderImpl.java | 40 +++++++++++++++-------
pulsar-functions/instance/build.gradle.kts | 2 +-
pulsar-functions/localrun-shaded/build.gradle.kts | 5 ++-
15 files changed, 88 insertions(+), 83 deletions(-)
diff --git
a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
index 1435aaef76d..5aacd9c48b9 100644
---
a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
+++
b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
@@ -51,7 +51,7 @@
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
include(dependency("org.eclipse.angus:angus-activation"))
include(dependency("com.thoughtworks.paranamer:paranamer"))
include(dependency("com.typesafe.netty:netty-reactive-streams"))
- include(dependency("com.yahoo.datasketches:.*"))
+ include(dependency("org.apache.datasketches:.*"))
include(dependency("commons-.*:.*"))
include(dependency("io.airlift:.*"))
include(dependency("io.grpc:.*"))
@@ -138,7 +138,6 @@
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
relocateWithPrefix(shadePrefix, "org.eclipse.angus")
relocateWithPrefix(shadePrefix, "com.thoughtworks.paranamer")
relocateWithPrefix(shadePrefix, "com.typesafe")
- relocateWithPrefix(shadePrefix, "com.yahoo")
relocateWithPrefix(shadePrefix, "io.airlift")
relocateWithPrefix(shadePrefix, "io.grpc")
relocateWithPrefix(shadePrefix, "io.netty")
@@ -156,6 +155,7 @@
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
relocateWithPrefix(shadePrefix, "org.aopalliance")
relocateWithPrefix(shadePrefix, "org.apache.bookkeeper")
relocateWithPrefix(shadePrefix, "org.apache.commons")
+ relocateWithPrefix(shadePrefix, "org.apache.datasketches")
relocateWithPrefix(shadePrefix, "org.apache.pulsar.checksum")
relocateWithPrefix(shadePrefix, "org.asynchttpclient")
relocateWithPrefix(shadePrefix, "org.checkerframework")
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 957f2ab5851..edf92edd1c1 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -277,8 +277,8 @@ The Apache Software License, Version 2.0
- io.swagger.core.v3-swagger-annotations-jakarta-2.2.50.jar
* slog -- io.github.merlimat.slog-slog-0.9.7.jar
* DataSketches
- - com.yahoo.datasketches-memory-0.8.3.jar
- - com.yahoo.datasketches-sketches-core-0.8.3.jar
+ - org.apache.datasketches-datasketches-java-7.0.1.jar
+ - org.apache.datasketches-datasketches-memory-4.1.0.jar
* Apache Commons
- commons-beanutils-commons-beanutils-1.11.0.jar
- commons-cli-commons-cli-1.11.0.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 0c2e2e2ff47..f90a5d69350 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -336,8 +336,8 @@ The Apache Software License, Version 2.0
* Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar
* Swagger -- swagger-annotations-jakarta-2.2.50.jar
* DataSketches
- - memory-0.8.3.jar
- - sketches-core-0.8.3.jar
+ - datasketches-java-7.0.1.jar
+ - datasketches-memory-4.1.0.jar
* Apache Commons
- commons-codec-1.20.0.jar
- commons-io-2.21.0.jar
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index c624875b70a..870a763fc52 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -121,7 +121,6 @@ oshi = "6.4.0"
netty-reactive-streams = "2.0.6"
cron-utils = "9.1.6"
failsafe = "3.3.2"
-sketches = "0.8.3"
disruptor = "3.4.3"
ant = "1.10.12"
guice = "5.1.0"
@@ -166,6 +165,7 @@ json = "20231013"
consolecaptor = "1.0.3"
restassured = "5.4.0"
skyscreamer = "1.5.0"
+# misc
zstd-jni = "1.5.7-3"
lz4java = "1.10.3"
spring = "6.2.12"
@@ -174,6 +174,8 @@ aws-sdk = "1.12.788"
hadoop3 = "3.5.0"
jclouds = "2.6.0"
thrift = "0.23.0"
+datasketches-memory = "4.1.0"
+datasketches-java = "7.0.1"
# Shading
shadow = "9.4.1"
@@ -375,7 +377,6 @@ vertx-web = { module = "io.vertx:vertx-web", version.ref =
"vertx" }
avro = { module = "org.apache.avro:avro", version.ref = "avro" }
avro-protobuf = { module = "org.apache.avro:avro-protobuf", version.ref =
"avro" }
joda-time = "joda-time:joda-time:2.10.10"
-sketches-core = { module = "com.yahoo.datasketches:sketches-core", version.ref
= "sketches" }
java-semver = { module = "com.github.zafarkhaja:java-semver", version.ref =
"java-semver" }
oshi-core = { module = "com.github.oshi:oshi-core-java11", version.ref =
"oshi" }
jna = { module = "net.java.dev.jna:jna", version.ref = "jna" }
@@ -466,8 +467,8 @@ commons-logging = { module =
"commons-logging:commons-logging", version.ref = "c
commons-beanutils = { module = "commons-beanutils:commons-beanutils",
version.ref = "commons-beanutils" }
commons-configuration2 = { module =
"org.apache.commons:commons-configuration2", version.ref =
"commons-configuration2" }
bookkeeper-stats-api = { module =
"org.apache.bookkeeper.stats:bookkeeper-stats-api", version.ref = "bookkeeper" }
-datasketches-memory = "org.apache.datasketches:datasketches-memory:2.2.0"
-datasketches-java = "org.apache.datasketches:datasketches-java:6.1.1"
+datasketches-memory = { module =
"org.apache.datasketches:datasketches-memory", version.ref =
"datasketches-memory" }
+datasketches-java = { module = "org.apache.datasketches:datasketches-java",
version.ref = "datasketches-java" }
[plugins]
lightproto = { id = "io.streamnative.lightproto", version.ref = "lightproto" }
diff --git
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
index 2870a236654..f8c8b5d055c 100644
--- a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
+++ b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
@@ -31,7 +31,7 @@ dependencies {
implementation(libs.jetty.ee8.servlet)
implementation(libs.guava)
implementation(libs.netty.common)
- implementation(libs.sketches.core)
+ implementation(libs.datasketches.java)
testImplementation(libs.netty.buffer)
}
diff --git
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
index 5c5bf560799..f9f6a882ca6 100644
---
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
+++
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
@@ -18,10 +18,6 @@
*/
package org.apache.pulsar.metrics.prometheus.bookkeeper;
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,6 +26,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.datasketches.kll.KllDoublesSketch;
/**
* OpStatsLogger implementation that uses DataSketches library to calculate
the approximated latency quantiles.
@@ -45,8 +42,8 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
/*
* These are the sketches where all the aggregated results are published.
*/
- private volatile DoublesSketch successResult;
- private volatile DoublesSketch failResult;
+ private volatile KllDoublesSketch successResult;
+ private volatile KllDoublesSketch failResult;
private final LongAdder successCountAdder = new LongAdder();
private final LongAdder failCountAdder = new LongAdder();
@@ -147,22 +144,22 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
current = replacement;
replacement = local;
- final DoublesUnion aggregateSuccesss = new
DoublesUnionBuilder().build();
- final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+ final KllDoublesSketch aggregateSuccess =
KllDoublesSketch.newHeapInstance();
+ final KllDoublesSketch aggregateFail =
KllDoublesSketch.newHeapInstance();
local.map.forEach((localData, b) -> {
long stamp = localData.lock.writeLock();
try {
- aggregateSuccesss.update(localData.successSketch);
- localData.successSketch.reset();
- aggregateFail.update(localData.failSketch);
- localData.failSketch.reset();
+ aggregateSuccess.merge(localData.successSketch);
+ aggregateFail.merge(localData.failSketch);
+ localData.successSketch = KllDoublesSketch.newHeapInstance();
+ localData.failSketch = KllDoublesSketch.newHeapInstance();
} finally {
localData.lock.unlockWrite(stamp);
}
});
- successResult = aggregateSuccesss.getResultAndReset();
- failResult = aggregateFail.getResultAndReset();
+ successResult = aggregateSuccess;
+ failResult = aggregateFail;
}
public long getCount(boolean success) {
@@ -174,8 +171,8 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
}
public double getQuantileValue(boolean success, double quantile) {
- DoublesSketch s = success ? successResult : failResult;
- return s != null ? s.getQuantile(quantile) : Double.NaN;
+ KllDoublesSketch s = success ? successResult : failResult;
+ return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) :
Double.NaN;
}
public Map<String, String> getLabels() {
@@ -192,8 +189,8 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
}
private static class LocalData {
- private final DoublesSketch successSketch = new
DoublesSketchBuilder().build();
- private final DoublesSketch failSketch = new
DoublesSketchBuilder().build();
+ private KllDoublesSketch successSketch =
KllDoublesSketch.newHeapInstance();
+ private KllDoublesSketch failSketch =
KllDoublesSketch.newHeapInstance();
private final StampedLock lock = new StampedLock();
}
diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 7dbd9e60de3..af9561e9a3b 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -92,7 +92,7 @@ dependencies {
implementation(libs.bookkeeper.server)
implementation(libs.bookkeeper.circe.checksum)
implementation(libs.caffeine)
- implementation(libs.sketches.core)
+ implementation(libs.datasketches.java)
implementation(libs.netty.codec.haproxy)
implementation(libs.opentelemetry.sdk.extension.autoconfigure)
implementation(libs.jetty.ee10.websocket.jetty.server)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
index 8973ba6a25c..e4e9a668fd9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -18,13 +18,11 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.datasketches.kll.KllDoublesSketch;
/**
* OpStatsLogger implementation that uses DataSketches library to calculate
the approximated latency quantiles.
@@ -40,8 +38,8 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
/**
* These are the sketches where all the aggregated results are published.
*/
- private volatile DoublesSketch successResult;
- private volatile DoublesSketch failResult;
+ private volatile KllDoublesSketch successResult;
+ private volatile KllDoublesSketch failResult;
private final LongAdder successCountAdder = new LongAdder();
private final LongAdder failCountAdder = new LongAdder();
@@ -104,12 +102,12 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
current = replacement;
replacement = local;
- final DoublesUnion aggregateSuccess = new
DoublesUnionBuilder().build();
- final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+ final KllDoublesSketch aggregateSuccess =
KllDoublesSketch.newHeapInstance();
+ final KllDoublesSketch aggregateFail =
KllDoublesSketch.newHeapInstance();
local.record(aggregateSuccess, aggregateFail);
- successResult = aggregateSuccess.getResultAndReset();
- failResult = aggregateFail.getResultAndReset();
+ successResult = aggregateSuccess;
+ failResult = aggregateFail;
}
public long getCount(boolean success) {
@@ -121,7 +119,7 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
}
public double getQuantileValue(boolean success, double quantile) {
- DoublesSketch s = success ? successResult : failResult;
- return s != null ? s.getQuantile(quantile) : Double.NaN;
+ KllDoublesSketch s = success ? successResult : failResult;
+ return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) :
Double.NaN;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
index 7495f057aa0..66c8b241f68 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
@@ -18,12 +18,10 @@
*/
package org.apache.pulsar.broker.stats.prometheus.metrics;
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.datasketches.kll.KllDoublesSketch;
public class DataSketchesSummaryLogger {
@@ -36,7 +34,7 @@ public class DataSketchesSummaryLogger {
/*
* These are the sketches where all the aggregated results are published.
*/
- private volatile DoublesSketch values;
+ private volatile KllDoublesSketch values;
private final LongAdder countAdder = new LongAdder();
private final DoubleAdder sumAdder = new DoubleAdder();
@@ -60,10 +58,10 @@ public class DataSketchesSummaryLogger {
current = replacement;
replacement = local;
- final DoublesUnion aggregateValues = new DoublesUnionBuilder().build();
+ final KllDoublesSketch aggregateValues =
KllDoublesSketch.newHeapInstance();
local.record(aggregateValues, null);
- values = aggregateValues.getResultAndReset();
+ values = aggregateValues;
}
public long getCount() {
@@ -75,7 +73,7 @@ public class DataSketchesSummaryLogger {
}
public double getQuantileValue(double quantile) {
- DoublesSketch s = values;
- return s != null ? s.getQuantile(quantile) : Double.NaN;
+ KllDoublesSketch s = values;
+ return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) :
Double.NaN;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
index 6a32ce5b905..c0a9f31af0a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
@@ -19,14 +19,12 @@
package org.apache.pulsar.broker.stats.prometheus.metrics;
import com.google.common.annotations.VisibleForTesting;
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
-import com.yahoo.sketches.quantiles.DoublesUnion;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.StampedLock;
+import org.apache.datasketches.kll.KllDoublesSketch;
import org.jspecify.annotations.Nullable;
class ThreadLocalAccessor {
@@ -47,7 +45,7 @@ class ThreadLocalAccessor {
}
};
- void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion
aggregateFail) {
+ void record(KllDoublesSketch aggregateSuccess, @Nullable KllDoublesSketch
aggregateFail) {
map.keySet().forEach(key -> {
key.record(aggregateSuccess, aggregateFail);
if (key.shouldRemove()) {
@@ -67,8 +65,8 @@ class ThreadLocalAccessor {
static class LocalData {
- private final DoublesSketch successSketch = new
DoublesSketchBuilder().build();
- private final DoublesSketch failSketch = new
DoublesSketchBuilder().build();
+ private final KllDoublesSketch successSketch =
KllDoublesSketch.newHeapInstance();
+ private final KllDoublesSketch failSketch =
KllDoublesSketch.newHeapInstance();
private final StampedLock lock = new StampedLock();
// Keep a weak reference to the owner thread so that we can remove the
LocalData when the thread
// is not alive anymore or has been garbage collected.
@@ -100,13 +98,13 @@ class ThreadLocalAccessor {
}
}
- void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion
aggregateFail) {
+ void record(KllDoublesSketch aggregateSuccess, @Nullable
KllDoublesSketch aggregateFail) {
long stamp = lock.writeLock();
try {
- aggregateSuccess.update(successSketch);
+ aggregateSuccess.merge(successSketch);
successSketch.reset();
if (aggregateFail != null) {
- aggregateFail.update(failSketch);
+ aggregateFail.merge(failSketch);
failSketch.reset();
}
} finally {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
index 94c8337307d..3ccebd5581d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.broker.stats.prometheus.metrics;
import static org.testng.Assert.assertEquals;
-import com.yahoo.sketches.quantiles.DoublesUnion;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.util.concurrent.Phaser;
+import org.apache.datasketches.kll.KllDoublesSketch;
import org.jspecify.annotations.Nullable;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -33,19 +33,19 @@ public class ThreadLocalAccessorTest {
return new Object[][] {
// 1st element: whether the thread is a FastThreadLocalThread
// 2nd element: the 2nd argument passed to the
`ThreadLocalAccessor#record` method
- { true, DoublesUnion.builder().build() },
+ { true, KllDoublesSketch.newHeapInstance() },
{ true, null },
- { false, DoublesUnion.builder().build() },
+ { false, KllDoublesSketch.newHeapInstance() },
{ false, null },
};
}
@Test(dataProvider = "provider")
public void testShouldRemoveLocalDataWhenOwnerThreadIsNotAlive(
- boolean fastThreadLocalThread, @Nullable DoublesUnion
aggregateFail) throws Exception {
+ boolean fastThreadLocalThread, @Nullable KllDoublesSketch
aggregateFail) throws Exception {
// given a ThreadLocalAccessor instance
final var threadLocalAccessor = new ThreadLocalAccessor();
- DoublesUnion aggregateSuccess = DoublesUnion.builder().build();
+ KllDoublesSketch aggregateSuccess = KllDoublesSketch.newHeapInstance();
// using phaser to synchronize threads
Phaser phaser = new Phaser(2);
Thread thread = getThread(fastThreadLocalThread, () -> {
@@ -75,13 +75,13 @@ public class ThreadLocalAccessorTest {
}
@Test(dataProvider = "provider")
- public void testThreadGc(boolean fastThreadLocalThread, @Nullable
DoublesUnion aggregateFail) throws Exception {
+ public void testThreadGc(boolean fastThreadLocalThread, @Nullable
KllDoublesSketch aggregateFail) throws Exception {
final var accessor = new ThreadLocalAccessor();
getThread(fastThreadLocalThread, accessor::getLocalData).join();
System.gc();
// FastThreadLocalThread removes the LocalData from the map when the
thread finishes
assertEquals(accessor.getLocalDataCount(), fastThreadLocalThread ? 0 :
1);
- accessor.record(DoublesUnion.builder().build(), aggregateFail);
+ accessor.record(KllDoublesSketch.newHeapInstance(), aggregateFail);
assertEquals(accessor.getLocalDataCount(), 0);
}
diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts
index f0618cd7efe..43c382b2478 100644
--- a/pulsar-client/build.gradle.kts
+++ b/pulsar-client/build.gradle.kts
@@ -47,7 +47,7 @@ dependencies {
implementation(libs.netty.reactive.streams)
implementation(libs.slog)
implementation(libs.commons.codec)
- implementation(libs.sketches.core)
+ implementation(libs.datasketches.java)
implementation(libs.gson)
implementation(libs.avro) {
exclude(group = "org.slf4j")
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 19a49158c66..f8ad90be352 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.client.impl;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
-import com.yahoo.sketches.quantiles.DoublesSketch;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.text.DecimalFormat;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import lombok.CustomLog;
+import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -53,9 +54,9 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private final LongAdder totalAcksReceived;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
- private final transient DoublesSketch ds;
- private final transient DoublesSketch batchSizeDs;
- private final transient DoublesSketch msgSizeDs;
+ private final transient KllDoublesSketch ds;
+ private final transient KllDoublesSketch batchSizeDs;
+ private final transient KllDoublesSketch msgSizeDs;
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
@@ -74,9 +75,9 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
- ds = DoublesSketch.builder().build(256);
- batchSizeDs = DoublesSketch.builder().build(256);
- msgSizeDs = DoublesSketch.builder().build(256);
+ ds = KllDoublesSketch.newHeapInstance(256);
+ batchSizeDs = KllDoublesSketch.newHeapInstance(256);
+ msgSizeDs = KllDoublesSketch.newHeapInstance(256);
}
public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ProducerConfigurationData conf,
@@ -92,9 +93,9 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
- ds = DoublesSketch.builder().build(256);
- batchSizeDs = DoublesSketch.builder().build(256);
- msgSizeDs = DoublesSketch.builder().build(256);
+ ds = KllDoublesSketch.newHeapInstance(256);
+ batchSizeDs = KllDoublesSketch.newHeapInstance(256);
+ msgSizeDs = KllDoublesSketch.newHeapInstance(256);
init(conf);
}
@@ -154,17 +155,17 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalAcksReceived.add(currentNumAcksReceived);
synchronized (ds) {
- latencyPctValues = ds.getQuantiles(PERCENTILES);
+ latencyPctValues = getQuantiles(ds);
ds.reset();
}
synchronized (batchSizeDs) {
- batchSizePctValues = batchSizeDs.getQuantiles(PERCENTILES);
+ batchSizePctValues = getQuantiles(batchSizeDs);
batchSizeDs.reset();
}
synchronized (msgSizeDs) {
- msgSizePctValues = msgSizeDs.getQuantiles(PERCENTILES);
+ msgSizePctValues = getQuantiles(msgSizeDs);
msgSizeDs.reset();
}
@@ -206,6 +207,19 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
}
}
+ /**
+ * Returns the configured percentile quantiles for the given sketch.
KllDoublesSketch throws on an empty
+ * sketch, so an array of {@link Double#NaN} is returned in that case to
preserve the previous behavior.
+ */
+ private static double[] getQuantiles(KllDoublesSketch sketch) {
+ if (sketch.isEmpty()) {
+ double[] values = new double[PERCENTILES.length];
+ Arrays.fill(values, Double.NaN);
+ return values;
+ }
+ return sketch.getQuantiles(PERCENTILES);
+ }
+
@Override
public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
numMsgsSent.add(numMsgs);
diff --git a/pulsar-functions/instance/build.gradle.kts
b/pulsar-functions/instance/build.gradle.kts
index e474f8e68e5..968f5b200a3 100644
--- a/pulsar-functions/instance/build.gradle.kts
+++ b/pulsar-functions/instance/build.gradle.kts
@@ -43,7 +43,7 @@ dependencies {
implementation(libs.simpleclient.caffeine)
implementation(libs.simpleclient.httpserver)
implementation(libs.prometheus.jmx.collector)
- implementation(libs.sketches.core)
+ implementation(libs.datasketches.java)
implementation(libs.jackson.databind)
implementation(libs.netty.buffer)
implementation(libs.netty.common)
diff --git a/pulsar-functions/localrun-shaded/build.gradle.kts
b/pulsar-functions/localrun-shaded/build.gradle.kts
index 882c0cbbbd1..2ff2100578f 100644
--- a/pulsar-functions/localrun-shaded/build.gradle.kts
+++ b/pulsar-functions/localrun-shaded/build.gradle.kts
@@ -64,7 +64,7 @@ tasks.shadowJar {
include(dependency("info.picocli:.*"))
include(dependency("net.jodah:.*"))
include(dependency("io.airlift:.*"))
- include(dependency("com.yahoo.datasketches:.*"))
+ include(dependency("org.apache.datasketches:.*"))
}
// Exclude bouncycastle from pulsar-client (signatures would break if
shaded)
@@ -80,8 +80,6 @@ tasks.shadowJar {
relocateWithPrefix(shadePrefix, "com.squareup.okhttp")
relocateWithPrefix(shadePrefix, "com.squareup.okio")
relocateWithPrefix(shadePrefix, "com.thoughtworks.paranamer")
- relocateWithPrefix(shadePrefix, "com.yahoo.datasketches")
- relocateWithPrefix(shadePrefix, "com.yahoo.sketches")
relocateWithPrefix(shadePrefix, "commons-cli")
relocateWithPrefix(shadePrefix, "commons-codec")
relocateWithPrefix(shadePrefix, "commons-io")
@@ -111,6 +109,7 @@ tasks.shadowJar {
relocateWithPrefix(shadePrefix, "org.apache.bookkeeper")
relocateWithPrefix(shadePrefix, "org.apache.commons")
relocateWithPrefix(shadePrefix, "org.apache.curator")
+ relocateWithPrefix(shadePrefix, "org.apache.datasketches")
relocateWithPrefix(shadePrefix, "org.apache.distributedlog")
relocateWithPrefix(shadePrefix, "org.apache.jute")
relocateWithPrefix(shadePrefix, "org.apache.yetus")