FrankYang0529 commented on code in PR #19523:
URL: https://github.com/apache/kafka/pull/19523#discussion_r2066016813
##########
gradle/dependencies.gradle:
##########
@@ -147,6 +148,7 @@ libs += [
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsValidator:
"commons-validator:commons-validator:$versions.commonsValidator",
+ guava: "com.google.guava:guava:$versions.guava",
Review Comment:
Based on benchmark result, lz4 has better performance in our case. I will
change to use it.
```
Benchmark (numReplicasPerBroker) (partitionsPerTopic)
(replicationFactor) Mode Cnt Score Error Units
TopicHashBenchmark.testLz4 10 10
3 avgt 15 194.553 ± 1.631 ns/op
TopicHashBenchmark.testLz4 10 50
3 avgt 15 484.640 ± 1.721 ns/op
TopicHashBenchmark.testLz4 10 100
3 avgt 15 883.435 ± 4.001 ns/op
TopicHashBenchmark.testMurmur 10 10
3 avgt 15 205.529 ± 0.701 ns/op
TopicHashBenchmark.testMurmur 10 50
3 avgt 15 1066.528 ± 42.856 ns/op
TopicHashBenchmark.testMurmur 10 100
3 avgt 15 2082.821 ± 10.935 ns/op
```
<details>
<summary> TopicHashBenchmark.java
</summary>
```java
package org.apache.kafka.jmh.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Murmur3;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.image.ClusterDelta;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.BrokerRegistration;
import net.jpountz.xxhash.XXHash64;
import net.jpountz.xxhash.XXHashFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta;
import static
org.apache.kafka.jmh.metadata.TopicsImageSnapshotLoadBenchmark.getNumBrokers;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class TopicHashBenchmark {
@Param({"10", "50", "100"})
private int partitionsPerTopic;
@Param({"3"})
private int replicationFactor;
@Param({"10"})
private int numReplicasPerBroker;
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final DataOutputStream dos = new DataOutputStream(baos);
@Setup(Level.Trial)
public void setup() throws IOException {
TopicsDelta topicsDelta = getInitialTopicsDelta(1,
partitionsPerTopic, replicationFactor, numReplicasPerBroker);
int numBrokers = getNumBrokers(1, partitionsPerTopic,
replicationFactor, numReplicasPerBroker);
ClusterDelta clusterDelta = new ClusterDelta(ClusterImage.EMPTY);
for (int i = 0; i < numBrokers; i++) {
clusterDelta.replay(new RegisterBrokerRecord()
.setBrokerId(i)
.setRack(Uuid.randomUuid().toString())
);
}
TopicImage topicImage =
topicsDelta.apply().topicsById().values().stream().findFirst().get();
ClusterImage clusterImage = clusterDelta.apply();
dos.writeByte(0); // magic byte
dos.writeLong(topicImage.id().hashCode()); // topic ID
dos.writeUTF(topicImage.name()); // topic name
dos.writeInt(topicImage.partitions().size()); // number of partitions
for (int i = 0; i < topicImage.partitions().size(); i++) {
dos.writeInt(i); // partition id
List<String> sortedRacksList =
Arrays.stream(topicImage.partitions().get(i).replicas)
.mapToObj(clusterImage::broker)
.filter(Objects::nonNull)
.map(BrokerRegistration::rack)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted()
.toList();
String racks = IntStream.range(0, sortedRacksList.size())
.mapToObj(idx -> idx + ":" + sortedRacksList.get(idx)) //
Format: "index:value"
.collect(Collectors.joining(",")); // Separator between
"index:value" pairs
dos.writeUTF(racks); // sorted racks
}
dos.flush();
}
@TearDown(Level.Trial)
public void tearDown() throws IOException {
dos.close();
baos.close();
}
@Benchmark
public void testLz4() {
XXHash64 hash = XXHashFactory.fastestInstance().hash64();
hash.hash(baos.toByteArray(), 0, baos.size(), 0);
}
@Benchmark
public void testMurmur() {
Murmur3.hash64(baos.toByteArray());
}
}
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]