This is an automated email from the ASF dual-hosted git repository.
nodece pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 81a77530b83 [fix][broker] Replace Java serialization with JSON in
package metadata (#25570)
81a77530b83 is described below
commit 81a77530b831463fe23f4df38c6cdb66df9b5bdd
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Apr 23 12:33:58 2026 +0300
[fix][broker] Replace Java serialization with JSON in package metadata
(#25570)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 18 ++++
.../org/apache/pulsar/broker/PulsarService.java | 4 +-
pulsar-package-management/core/build.gradle.kts | 3 +
.../core/common/PackageMetadataUtil.java | 116 +++++++++++++++++++--
.../core/impl/PackagesManagementImpl.java | 46 +++++++-
.../core/common/PackageMetadataSerdeTest.java | 105 +++++++++++++++----
.../core/impl/PackagesManagementImplTest.java | 4 +-
7 files changed, 263 insertions(+), 33 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0931aa64114..b03af49c56e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -4065,6 +4065,24 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private String packagesManagementLedgerRootPath = "/ledgers";
+ @FieldContext(
+ category = CATEGORY_PACKAGES_MANAGEMENT,
+ doc = "Whether new package metadata writes use JSON (safe) or the
legacy Java serialization format. "
+ + "Defaults to true. Set to false only as a temporary rollback
path; the legacy format will be "
+ + "removed in a future release."
+ )
+ private boolean packagesManagementJsonSerializationEnabled = true;
+
+ @FieldContext(
+ category = CATEGORY_PACKAGES_MANAGEMENT,
+ doc = "Whether to accept reading legacy Java-serialized package
metadata written by older brokers. "
+ + "A strict ObjectInputFilter allowlist is applied unconditionally
whenever the legacy path "
+ + "runs. Defaults to true for upgrade compatibility; disable once
all existing packages have "
+ + "been re-uploaded or rewritten via updateMeta. This default is
scheduled to flip to false in "
+ + "a future release."
+ )
+ private boolean packagesManagementAllowLegacyJavaSerialization = true;
+
/* packages management service configurations (end) */
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aba81d8a771..85bd9f53b1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -2074,7 +2074,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
- this.packagesManagement = new PackagesManagementImpl();
+ this.packagesManagement = new PackagesManagementImpl(
+ config.isPackagesManagementJsonSerializationEnabled(),
+ config.isPackagesManagementAllowLegacyJavaSerialization());
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new
DefaultPackagesStorageConfiguration();
diff --git a/pulsar-package-management/core/build.gradle.kts
b/pulsar-package-management/core/build.gradle.kts
index be0e227bcfd..5928408502f 100644
--- a/pulsar-package-management/core/build.gradle.kts
+++ b/pulsar-package-management/core/build.gradle.kts
@@ -23,6 +23,9 @@ plugins {
dependencies {
api(project(":pulsar-client-admin-api"))
+ implementation(project(":pulsar-common"))
implementation(libs.guava)
implementation(libs.commons.lang3)
+ implementation(libs.jackson.databind)
+ implementation(libs.slf4j.api)
}
diff --git
a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadataUtil.java
b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadataUtil.java
index 27b37d06b1d..e6e6d2a84bc 100644
---
a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadataUtil.java
+++
b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/common/PackageMetadataUtil.java
@@ -18,23 +18,127 @@
*/
package org.apache.pulsar.packages.management.core.common;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputFilter;
+import java.io.ObjectInputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.SerializationUtils;
-import
org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import
org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException.MetadataFormatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PackageMetadataUtil {
- public static PackageMetadata fromBytes(byte[] bytes) throws
PackagesManagementException.MetadataFormatException {
+
+ private static final Logger log =
LoggerFactory.getLogger(PackageMetadataUtil.class);
+
+ private static final byte JSON_LEADING_BYTE = '{';
+ private static final byte JAVA_MAGIC_BYTE_0 = (byte) 0xAC;
+ private static final byte JAVA_MAGIC_BYTE_1 = (byte) 0xED;
+
+ // Strict allowlist for the legacy Java-serialized read path. Only classes
reachable from
+ // PackageMetadata's declared field graph are permitted; everything else
is rejected before
+ // instantiation by the JEP 290 filter, blocking gadget chain entry points.
+ private static final ObjectInputFilter LEGACY_FILTER =
ObjectInputFilter.Config.createFilter(
+ "maxdepth=5;maxrefs=100;maxbytes=1048576;maxarray=1024;"
+ +
"org.apache.pulsar.packages.management.core.common.PackageMetadata;"
+ +
"java.util.HashMap;java.util.LinkedHashMap;java.util.Map;java.util.Map$Entry;"
+ + "java.util.AbstractMap;"
+ +
"java.lang.String;java.lang.Number;java.lang.Long;java.lang.Boolean;"
+ + "java.lang.Object;"
+ + "!*");
+
+ private static final ObjectReader JSON_READER =
+
ObjectMapperFactory.getMapper().reader().forType(PackageMetadata.class);
+ private static final ObjectWriter JSON_WRITER =
ObjectMapperFactory.getMapper().writer();
+
+ private static final AtomicBoolean LEGACY_READ_WARNED = new
AtomicBoolean(false);
+
+ private PackageMetadataUtil() {
+ }
+
+ public static byte[] toBytes(PackageMetadata packageMetadata, boolean
jsonSerializationEnabled) {
+ if (jsonSerializationEnabled) {
+ try {
+ return JSON_WRITER.writeValueAsBytes(packageMetadata);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to serialize package
metadata as JSON", e);
+ }
+ }
+ return SerializationUtils.serialize(packageMetadata);
+ }
+
+ public static PackageMetadata fromBytes(byte[] bytes, boolean
allowLegacyJavaSerialization)
+ throws MetadataFormatException {
+ if (bytes == null || bytes.length == 0) {
+ throw new MetadataFormatException("Empty package metadata");
+ }
+ int firstNonWhitespace = indexOfFirstNonWhitespace(bytes);
+ if (firstNonWhitespace >= 0 && bytes[firstNonWhitespace] ==
JSON_LEADING_BYTE) {
+ return readJson(bytes);
+ }
+ if (bytes.length >= 2 && bytes[0] == JAVA_MAGIC_BYTE_0 && bytes[1] ==
JAVA_MAGIC_BYTE_1) {
+ if (!allowLegacyJavaSerialization) {
+ throw new MetadataFormatException(
+ "Package metadata is in legacy Java serialization
format but reading it is disabled. "
+ + "Enable
packagesManagementAllowLegacyJavaSerialization or re-upload the package.");
+ }
+ return readLegacy(bytes);
+ }
+ throw new MetadataFormatException("Unrecognized package metadata
format");
+ }
+
+ private static PackageMetadata readJson(byte[] bytes) throws
MetadataFormatException {
try {
- Object o = SerializationUtils.deserialize(bytes);
+ return JSON_READER.readValue(bytes);
+ } catch (IOException e) {
+ throw new MetadataFormatException("Failed to parse package
metadata as JSON: " + e.getMessage());
+ }
+ }
+
+ private static PackageMetadata readLegacy(byte[] bytes) throws
MetadataFormatException {
+ try (ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(bytes))) {
+ ois.setObjectInputFilter(LEGACY_FILTER);
+ Object o = ois.readObject();
if (!(o instanceof PackageMetadata)) {
- throw new
PackagesManagementException.MetadataFormatException("Unexpected metadata
format");
+ throw new MetadataFormatException("Unexpected metadata type: "
+ + (o == null ? "null" : o.getClass().getName()));
+ }
+ if (LEGACY_READ_WARNED.compareAndSet(false, true)) {
+ log.warn("Read a package metadata entry in the legacy Java
serialization format. "
+ + "Re-upload packages or call updateMeta to migrate
them to JSON, then disable "
+ + "packagesManagementAllowLegacyJavaSerialization.");
}
return (PackageMetadata) o;
+ } catch (MetadataFormatException e) {
+ throw e;
} catch (Exception e) {
- throw new
PackagesManagementException.MetadataFormatException("Unexpected error", e);
+ throw new MetadataFormatException("Rejected legacy package
metadata: " + e.getMessage());
}
}
+ private static int indexOfFirstNonWhitespace(byte[] bytes) {
+ for (int i = 0; i < bytes.length; i++) {
+ byte b = bytes[i];
+ if (b != ' ' && b != '\t' && b != '\n' && b != '\r') {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ // Source-compatible overloads for callers (including external
integrations) that haven't
+ // been updated to the explicit-flag form. Defaults track the production
defaults.
+ @Deprecated
public static byte[] toBytes(PackageMetadata packageMetadata) {
- return SerializationUtils.serialize(packageMetadata);
+ return toBytes(packageMetadata, true);
+ }
+
+ @Deprecated
+ public static PackageMetadata fromBytes(byte[] bytes) throws
MetadataFormatException {
+ return fromBytes(bytes, true);
}
}
diff --git
a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
index 7861d07c775..05f6e6aaf88 100644
---
a/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
+++
b/pulsar-package-management/core/src/main/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImpl.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
@@ -38,9 +39,45 @@ import
org.apache.pulsar.packages.management.core.exceptions.PackagesManagementE
/**
* Packages management implementation.
*/
+@Slf4j
public class PackagesManagementImpl implements PackagesManagement {
private PackagesStorage storage;
+ private final boolean jsonSerializationEnabled;
+ private final boolean allowLegacyJavaSerialization;
+
+ public PackagesManagementImpl() {
+ this(true, true, false);
+ }
+
+ public PackagesManagementImpl(boolean jsonSerializationEnabled, boolean
allowLegacyJavaSerialization) {
+ this(jsonSerializationEnabled, allowLegacyJavaSerialization, true);
+ }
+
+ private PackagesManagementImpl(boolean jsonSerializationEnabled, boolean
allowLegacyJavaSerialization,
+ boolean logConfigWarnings) {
+ this.jsonSerializationEnabled = jsonSerializationEnabled;
+ this.allowLegacyJavaSerialization = allowLegacyJavaSerialization;
+ if (logConfigWarnings) {
+ if (allowLegacyJavaSerialization) {
+ log.warn("Reading legacy Java-serialized package metadata is
enabled "
+ +
"(packagesManagementAllowLegacyJavaSerialization=true). Re-upload existing
packages "
+ + "or let organic updateMeta calls rewrite them as
JSON, then disable this flag. "
+ + "This default is scheduled to flip to false in a
future Pulsar release.");
+ }
+ if (!jsonSerializationEnabled) {
+ log.warn("Package metadata is being written in the legacy Java
serialization format "
+ + "(packagesManagementJsonSerializationEnabled=false).
JSON is preferred; this flag "
+ + "exists only for rollback and will be removed in a
future Pulsar release.");
+ }
+ if (!jsonSerializationEnabled && !allowLegacyJavaSerialization) {
+ log.warn("packagesManagementJsonSerializationEnabled=false
combined with "
+ +
"packagesManagementAllowLegacyJavaSerialization=false means new writes use Java
"
+ + "serialization but reads reject it — existing
entries will fail to load. This is "
+ + "likely a misconfiguration.");
+ }
+ }
+ }
@Override
public void initialize(PackagesStorage storage) {
@@ -85,7 +122,8 @@ public class PackagesManagementImpl implements
PackagesManagement {
future.completeExceptionally(throwable);
return;
}
- try (ByteArrayInputStream in = new
ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata))) {
+ try (ByteArrayInputStream in = new ByteArrayInputStream(
+ PackageMetadataUtil.toBytes(metadata,
jsonSerializationEnabled))) {
storage.deleteAsync(metadataPath)
.thenCompose(aVoid -> storage.writeAsync(metadataPath,
in))
.whenComplete((aVoid, t) -> {
@@ -107,7 +145,8 @@ public class PackagesManagementImpl implements
PackagesManagement {
private CompletableFuture<Void> writeMeta(PackageName packageName,
PackageMetadata metadata) {
CompletableFuture<Void> future = new CompletableFuture<>();
String metadataPath = metadataPath(packageName);
- try (ByteArrayInputStream inputStream = new
ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata))) {
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(
+ PackageMetadataUtil.toBytes(metadata,
jsonSerializationEnabled))) {
storage.writeAsync(metadataPath, inputStream)
.whenComplete((aVoid, t) -> {
if (t != null) {
@@ -236,7 +275,8 @@ public class PackagesManagementImpl implements
PackagesManagement {
private CompletableFuture<PackageMetadata>
metadataReadFromStream(ByteArrayOutputStream outputStream) {
CompletableFuture<PackageMetadata> future = new CompletableFuture<>();
try {
- PackageMetadata metadata =
PackageMetadataUtil.fromBytes(outputStream.toByteArray());
+ PackageMetadata metadata = PackageMetadataUtil.fromBytes(
+ outputStream.toByteArray(), allowLegacyJavaSerialization);
future.complete(metadata);
} catch (PackagesManagementException.MetadataFormatException e) {
future.completeExceptionally(e);
diff --git
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
index 01aa9d2fbdd..0ab441cd572 100644
---
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
+++
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/common/PackageMetadataSerdeTest.java
@@ -18,38 +18,101 @@
*/
package org.apache.pulsar.packages.management.core.common;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
import java.util.HashMap;
-import
org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException;
-import org.testng.Assert;
+import
org.apache.pulsar.packages.management.core.exceptions.PackagesManagementException.MetadataFormatException;
import org.testng.annotations.Test;
public class PackageMetadataSerdeTest {
- @Test
- public void testPackageMetadataSerDe() {
+
+ private static PackageMetadata sampleMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("testKey", "testValue");
- PackageMetadata metadata = PackageMetadata.builder()
- .description("test package metadata serialize and deserialize
flow")
- .createTime(System.currentTimeMillis())
- .contact("[email protected]")
- .modificationTime(System.currentTimeMillis() + 1000)
- .properties(properties).build();
+ return PackageMetadata.builder()
+ .description("test package metadata serialize and deserialize
flow")
+ .createTime(1_000L)
+ .contact("[email protected]")
+ .modificationTime(2_000L)
+ .properties(properties).build();
+ }
+
+ @Test
+ public void testJsonRoundTrip() throws MetadataFormatException {
+ PackageMetadata metadata = sampleMetadata();
+ byte[] bytes = PackageMetadataUtil.toBytes(metadata, true);
+ assertEquals(bytes[0], (byte) '{', "JSON-encoded metadata must start
with '{'");
+ PackageMetadata roundTripped = PackageMetadataUtil.fromBytes(bytes,
true);
+ assertEquals(roundTripped, metadata);
+ }
- byte[] metadataSerialized = PackageMetadataUtil.toBytes(metadata);
+ @Test
+ public void testLegacyRoundTrip() throws MetadataFormatException {
+ PackageMetadata metadata = sampleMetadata();
+ byte[] bytes = PackageMetadataUtil.toBytes(metadata, false);
+ assertEquals(bytes[0], (byte) 0xAC, "Java-serialized stream must start
with STREAM_MAGIC byte 0xAC");
+ assertEquals(bytes[1], (byte) 0xED, "Java-serialized stream must start
with STREAM_MAGIC byte 0xED");
+ PackageMetadata roundTripped = PackageMetadataUtil.fromBytes(bytes,
true);
+ assertEquals(roundTripped, metadata);
+ }
+
+ @Test
+ public void testJsonReadableWhenLegacyDisabled() throws
MetadataFormatException {
+ PackageMetadata metadata = sampleMetadata();
+ byte[] jsonBytes = PackageMetadataUtil.toBytes(metadata, true);
+ // JSON is safe regardless of the legacy flag.
+ PackageMetadata roundTripped =
PackageMetadataUtil.fromBytes(jsonBytes, false);
+ assertEquals(roundTripped, metadata);
+ }
+ @Test
+ public void testLegacyRejectedWhenLegacyDisabled() {
+ byte[] legacyBytes = PackageMetadataUtil.toBytes(sampleMetadata(),
false);
try {
- PackageMetadata deSerializedMetadata =
PackageMetadataUtil.fromBytes(metadataSerialized);
- Assert.assertEquals(metadata, deSerializedMetadata);
- } catch (PackagesManagementException.MetadataFormatException e) {
- Assert.fail("should not throw any exception");
+ PackageMetadataUtil.fromBytes(legacyBytes, false);
+ throw new AssertionError("Expected MetadataFormatException");
+ } catch (MetadataFormatException ex) {
+
assertTrue(ex.getMessage().contains("packagesManagementAllowLegacyJavaSerialization"),
+ "Rejection message should name the config flag to flip: "
+ ex.getMessage());
}
+ }
- try {
- byte[] failedMetadataSerialized = "wrong package
metadata".getBytes();
- PackageMetadata deSerializedMetadata =
PackageMetadataUtil.fromBytes(failedMetadataSerialized);
- Assert.fail("should throw the metadata format exception");
- } catch (PackagesManagementException.MetadataFormatException e) {
- // expected error
+ @Test
+ public void testFilterRejectsUnsafeClass() throws Exception {
+ // Build a valid Java serialization stream whose top-level object is
NOT a PackageMetadata
+ // and whose class is outside the filter's allowlist. The JEP 290
filter must reject the
+ // class descriptor BEFORE ObjectInputStream instantiates it.
+ ArrayList<String> disallowed = new ArrayList<>();
+ disallowed.add("gadget-chain-entry");
+ byte[] bytes;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+ oos.writeObject(disallowed);
+ oos.flush();
+ bytes = baos.toByteArray();
}
+ // Starts with the Java magic — fromBytes will dispatch to the legacy
reader.
+ assertEquals(bytes[0], (byte) 0xAC);
+ assertEquals(bytes[1], (byte) 0xED);
+ assertThrows(MetadataFormatException.class,
+ () -> PackageMetadataUtil.fromBytes(bytes, true));
+ }
+
+ @Test
+ public void testUnknownFormatRejected() {
+ assertThrows(MetadataFormatException.class,
+ () -> PackageMetadataUtil.fromBytes("plain text".getBytes(),
true));
+ }
+
+ @Test
+ public void testEmptyRejected() {
+ assertThrows(MetadataFormatException.class,
+ () -> PackageMetadataUtil.fromBytes(new byte[0], true));
+ assertThrows(MetadataFormatException.class,
+ () -> PackageMetadataUtil.fromBytes(null, true));
}
}
diff --git
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
index 41f31f3497d..557f6129260 100644
---
a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
+++
b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/impl/PackagesManagementImplTest.java
@@ -125,7 +125,7 @@ public class PackagesManagementImplTest {
.contact("[email protected]")
.description("A mocked test package")
.createTime(System.currentTimeMillis()).build();
- try (ByteArrayInputStream inputStream = new
ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata))) {
+ try (ByteArrayInputStream inputStream = new
ByteArrayInputStream(PackageMetadataUtil.toBytes(metadata, true))) {
packagesManagement.upload(packageName, metadata,
inputStream).get();
} catch (Exception e) {
Assert.fail("should not throw any exception");
@@ -142,7 +142,7 @@ public class PackagesManagementImplTest {
// download an existent package should succeed
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
packagesManagement.download(packageName, outputStream).get();
- PackageMetadata getPackage =
PackageMetadataUtil.fromBytes(outputStream.toByteArray());
+ PackageMetadata getPackage =
PackageMetadataUtil.fromBytes(outputStream.toByteArray(), true);
Assert.assertEquals(metadata, getPackage);
} catch (Exception e) {
Assert.fail("should not throw any exception");