This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9dfc5a72423 MINOR: Prefer MetadataRecordSerde.INSTANCE over new
instances (#22289)
9dfc5a72423 is described below
commit 9dfc5a724239042ed6add3e87c0041e3ef9c2dee
Author: JiayƔo Sun <[email protected]>
AuthorDate: Sat May 16 14:06:36 2026 +1200
MINOR: Prefer MetadataRecordSerde.INSTANCE over new instances (#22289)
Replace `new MetadataRecordSerde()` with `MetadataRecordSerde.INSTANCE`
across the codebase. MetadataRecordSerde is stateless and thread-safe,
so a single shared instance is sufficient.
Also add Javadoc on the public constructor to guide future callers
toward the singleton.
Relevant discussions:
- https://github.com/apache/kafka/pull/22116#discussion_r3237099653
- https://github.com/apache/kafka/pull/10990#discussion_r666715085
Verified that MetadataRecordSerde (and its parent
AbstractApiMessageSerde) has no mutable instance fields. All methods
(read/write/recordSize) operate solely on their parameters and local
variables, confirming no thread-safety or state-recycling concerns with
sharing a single instance.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/SharedServer.scala | 2 +-
.../apache/kafka/metadata/MetadataRecordSerde.java | 7 +++++
.../apache/kafka/metadata/storage/Formatter.java | 2 +-
.../kafka/metadata/util/BatchFileReader.java | 4 +--
.../kafka/metadata/util/BatchFileWriter.java | 2 +-
.../kafka/metadata/util/SnapshotFileReader.java | 3 +-
.../apache/kafka/controller/MockRaftClient.java | 6 ++--
.../kafka/metadata/MetadataRecordSerdeTest.java | 36 +++++++++-------------
.../kafka/server/RaftClusterSnapshotTest.java | 2 +-
.../org/apache/kafka/tools/DumpLogSegments.java | 3 +-
.../apache/kafka/tools/DumpLogSegmentsTest.java | 5 ++-
11 files changed, 33 insertions(+), 39 deletions(-)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 19f88f62bb7..fcc39b94bec 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -294,7 +294,7 @@ class SharedServer(
clusterId,
sharedServerConfig,
metaPropsEnsemble.logDirProps.get(metaPropsEnsemble.metadataLogDir.get).directoryId.get,
- new MetadataRecordSerde,
+ MetadataRecordSerde.INSTANCE,
KafkaRaftServer.MetadataPartition,
KafkaRaftServer.MetadataTopicId,
time,
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index 7964fed11c2..de04cf8f58a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -23,6 +23,13 @@ import
org.apache.kafka.server.common.serialization.AbstractApiMessageSerde;
public class MetadataRecordSerde extends AbstractApiMessageSerde {
public static final MetadataRecordSerde INSTANCE = new
MetadataRecordSerde();
+ /**
+ * Prefer using {@link #INSTANCE} instead of creating new instances,
+ * as this class is stateless and thread-safe.
+ */
+ public MetadataRecordSerde() {
+ }
+
@Override
public ApiMessage apiMessageFor(short apiKey) {
return MetadataRecordType.fromId(apiKey).newMetadataRecord();
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 3d3d3226826..1ccf0c3d5c0 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -506,7 +506,7 @@ public class Formatter {
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
setVoterSet(Optional.of(voterSet));
- try (RecordsSnapshotWriter<ApiMessageAndVersion> writer =
builder.build(new MetadataRecordSerde())) {
+ try (RecordsSnapshotWriter<ApiMessageAndVersion> writer =
builder.build(MetadataRecordSerde.INSTANCE)) {
writer.freeze();
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index 1868a3fbb6c..71957dce6cf 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -74,12 +74,10 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
private final FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
- private final MetadataRecordSerde serde;
private BatchFileReader(FileRecords fileRecords) {
this.fileRecords = fileRecords;
this.batchIterator = fileRecords.batchIterator();
- this.serde = new MetadataRecordSerde();
}
@Override
@@ -142,7 +140,7 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
for (Record record : input) {
try {
ByteBufferAccessor accessor = new
ByteBufferAccessor(record.value());
- ApiMessageAndVersion messageAndVersion = serde.read(accessor,
record.valueSize());
+ ApiMessageAndVersion messageAndVersion =
MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
messages.add(messageAndVersion);
} catch (Throwable e) {
throw new RuntimeException("unable to deserialize record at
offset " + record.offset(), e);
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
index 2515821f549..f361f996997 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
@@ -97,7 +97,7 @@ public class BatchFileWriter implements AutoCloseable {
new BatchMemoryPool(5, MAX_BATCH_SIZE_BYTES),
time,
Compression.NONE,
- new MetadataRecordSerde()
+ MetadataRecordSerde.INSTANCE
);
// Append the snapshot header control record and force it to create a
batch
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index fc5a47b1ea5..ea3d4fcd672 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -58,7 +58,6 @@ public final class SnapshotFileReader implements
AutoCloseable {
private final CompletableFuture<Void> caughtUpFuture;
private FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
- private final MetadataRecordSerde serde = new MetadataRecordSerde();
private long lastOffset = -1L;
private volatile OptionalLong highWaterMark = OptionalLong.empty();
@@ -155,7 +154,7 @@ public final class SnapshotFileReader implements
AutoCloseable {
for (Record record : batch) {
ByteBufferAccessor accessor = new
ByteBufferAccessor(record.value());
try {
- ApiMessageAndVersion messageAndVersion = serde.read(accessor,
record.valueSize());
+ ApiMessageAndVersion messageAndVersion =
MetadataRecordSerde.INSTANCE.read(accessor, record.valueSize());
messages.add(messageAndVersion);
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}",
record.offset(), e);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
index f84dea081a6..f817717a2b3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java
@@ -479,7 +479,7 @@ public final class MockRaftClient implements
RaftClient<ApiMessageAndVersion>, A
listenerData.handleLoadSnapshot(
RecordsSnapshotReader.of(
snapshot.get(),
- new MetadataRecordSerde(),
+ MetadataRecordSerde.INSTANCE,
BufferSupplier.create(),
Integer.MAX_VALUE,
true,
@@ -554,7 +554,7 @@ public final class MockRaftClient implements
RaftClient<ApiMessageAndVersion>, A
}
private static int messageSize(ApiMessageAndVersion messageAndVersion,
ObjectSerializationCache objectCache) {
- return new MetadataRecordSerde().recordSize(messageAndVersion,
objectCache);
+ return MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion,
objectCache);
}
public void beginShutdown() {
@@ -729,7 +729,7 @@ public final class MockRaftClient implements
RaftClient<ApiMessageAndVersion>, A
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setTime(new MockTime())
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
- .build(new MetadataRecordSerde())
+ .build(MetadataRecordSerde.INSTANCE)
);
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index 16cb21ee289..c6efa8173c9 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -41,22 +41,22 @@ class MetadataRecordSerdeTest {
.setName("foo")
.setTopicId(Uuid.randomUuid());
- MetadataRecordSerde serde = new MetadataRecordSerde();
+
for (short version = TopicRecord.LOWEST_SUPPORTED_VERSION; version <=
TopicRecord.HIGHEST_SUPPORTED_VERSION; version++) {
ApiMessageAndVersion messageAndVersion = new
ApiMessageAndVersion(topicRecord, version);
ObjectSerializationCache cache = new ObjectSerializationCache();
- int size = serde.recordSize(messageAndVersion, cache);
+ int size =
MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion, cache);
ByteBuffer buffer = ByteBuffer.allocate(size);
ByteBufferAccessor bufferAccessor = new ByteBufferAccessor(buffer);
- serde.write(messageAndVersion, cache, bufferAccessor);
+ MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache,
bufferAccessor);
buffer.flip();
assertEquals(size, buffer.remaining());
- ApiMessageAndVersion readMessageAndVersion =
serde.read(bufferAccessor, size);
+ ApiMessageAndVersion readMessageAndVersion =
MetadataRecordSerde.INSTANCE.read(bufferAccessor, size);
assertEquals(messageAndVersion, readMessageAndVersion);
}
}
@@ -67,10 +67,9 @@ class MetadataRecordSerdeTest {
ByteUtils.writeUnsignedVarint(15, buffer);
buffer.flip();
- MetadataRecordSerde serde = new MetadataRecordSerde();
assertStartsWith("Could not deserialize metadata record due to unknown
frame version",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
16)).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), 16)).getMessage());
}
/**
@@ -78,7 +77,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingMalformedFrameVersionVarint() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
buffer.put((byte) 0x80);
@@ -91,7 +89,7 @@ class MetadataRecordSerdeTest {
buffer.limit(64);
assertStartsWith("Error while reading frame version",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
}
/**
@@ -99,7 +97,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingMalformedMessageTypeVarint() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
buffer.put((byte) 0x01);
@@ -113,7 +110,7 @@ class MetadataRecordSerdeTest {
buffer.limit(64);
assertStartsWith("Error while reading type",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
}
/**
@@ -121,7 +118,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingMalformedMessageVersionVarint() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
buffer.put((byte) 0x01);
@@ -135,7 +131,7 @@ class MetadataRecordSerdeTest {
buffer.limit(64);
assertStartsWith("Error while reading version",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
}
/**
@@ -143,7 +139,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingVersionTooLarge() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.clear();
buffer.put((byte) 0x01); // frame version
@@ -157,7 +152,7 @@ class MetadataRecordSerdeTest {
buffer.limit(64);
assertStartsWith("Value for version was too large",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
}
/**
@@ -165,7 +160,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingUnsupportedApiKey() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.put((byte) 0x01); // frame version
buffer.put((byte) 0xff); // apiKey
@@ -176,7 +170,7 @@ class MetadataRecordSerdeTest {
buffer.limit(64);
assertStartsWith("Unknown metadata id ",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getCause().getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getCause().getMessage());
}
/**
@@ -184,7 +178,6 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingMalformedMessage() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.put((byte) 0x01); // frame version
buffer.put((byte) 0x00); // apiKey
@@ -194,7 +187,7 @@ class MetadataRecordSerdeTest {
buffer.limit(4);
assertStartsWith("Failed to deserialize record with type",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer),
buffer.remaining())).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), buffer.remaining())).getMessage());
}
/**
@@ -202,19 +195,18 @@ class MetadataRecordSerdeTest {
*/
@Test
public void testParsingRecordWithGarbageAtEnd() {
- MetadataRecordSerde serde = new MetadataRecordSerde();
RegisterBrokerRecord message = new
RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(2);
ObjectSerializationCache cache = new ObjectSerializationCache();
ApiMessageAndVersion messageAndVersion = new
ApiMessageAndVersion(message, (short) 0);
- int size = serde.recordSize(messageAndVersion, cache);
+ int size = MetadataRecordSerde.INSTANCE.recordSize(messageAndVersion,
cache);
ByteBuffer buffer = ByteBuffer.allocate(size + 1);
- serde.write(messageAndVersion, cache, new ByteBufferAccessor(buffer));
+ MetadataRecordSerde.INSTANCE.write(messageAndVersion, cache, new
ByteBufferAccessor(buffer));
buffer.clear();
assertStartsWith("Found 1 byte(s) of garbage after",
assertThrows(MetadataParseException.class,
- () -> serde.read(new ByteBufferAccessor(buffer), size
+ 1)).getMessage());
+ () -> MetadataRecordSerde.INSTANCE.read(new
ByteBufferAccessor(buffer), size + 1)).getMessage());
}
private static void assertStartsWith(String prefix, String str) {
diff --git
a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
index 616405103e2..0d58bf482fa 100644
--- a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
+++ b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
@@ -100,7 +100,7 @@ public class RaftClusterSnapshotTest {
for (var raftManager : raftManagers.values()) {
try (var snapshot = RecordsSnapshotReader.of(
raftManager.raftLog().latestSnapshot().get(),
- new MetadataRecordSerde(),
+ MetadataRecordSerde.INSTANCE,
BufferSupplier.create(),
1,
true,
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index df4077060f6..0ac0b74b6bc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -731,13 +731,12 @@ public class DumpLogSegments {
}
private static class ClusterMetadataLogMessageParser implements
MessageParser<String, String> {
- private final MetadataRecordSerde metadataRecordSerde = new
MetadataRecordSerde();
@Override
public ParseResult<String, String> parse(Record record) {
String output;
try {
- ApiMessageAndVersion messageAndVersion =
metadataRecordSerde.read(
+ ApiMessageAndVersion messageAndVersion =
MetadataRecordSerde.INSTANCE.read(
new ByteBufferAccessor(record.value()),
record.valueSize());
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
json.set("type", new TextNode(
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index f919518cc9d..77fba98097c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -628,12 +628,11 @@ public class DumpLogSegmentsTest {
List<SimpleRecord> records = new ArrayList<>();
for (ApiMessageAndVersion message : metadataRecords) {
- MetadataRecordSerde serde = MetadataRecordSerde.INSTANCE;
ObjectSerializationCache cache = new ObjectSerializationCache();
- int size = serde.recordSize(message, cache);
+ int size = MetadataRecordSerde.INSTANCE.recordSize(message, cache);
ByteBuffer buf = ByteBuffer.allocate(size);
ByteBufferAccessor writer = new ByteBufferAccessor(buf);
- serde.write(message, cache, writer);
+ MetadataRecordSerde.INSTANCE.write(message, cache, writer);
buf.flip();
records.add(new SimpleRecord(null, buf.array()));
}