This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 4929f9d6600 MINOR: Fixed metrics decompression (#22327)
4929f9d6600 is described below

commit 4929f9d660067a795e5340ad75252b5b3f99110d
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue May 19 18:34:06 2026 +0100

    MINOR: Fixed metrics decompression (#22327)
    
    Minor fix for fixing decompression.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../common/requests/PushTelemetryRequest.java      |  4 +--
 .../telemetry/internals/ClientTelemetryUtils.java  | 12 +++++++--
 .../common/requests/PushTelemetryRequestTest.java  |  2 +-
 .../internals/ClientTelemetryUtilsTest.java        | 30 +++++++++++++++++++++-
 .../apache/kafka/server/ClientMetricsManager.java  |  2 +-
 .../metrics/ClientTelemetryExporterPlugin.java     |  8 +++---
 .../metrics/DefaultClientTelemetryPayload.java     |  4 +--
 .../kafka/server/ClientMetricsManagerTest.java     |  2 +-
 .../metrics/ClientMetricsTelemetryPluginTest.java  |  8 +++---
 .../kafka/server/metrics/ClientTelemetryTest.java  |  6 +++--
 10 files changed, 58 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
index b583dc0e561..b3502677dc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
@@ -85,10 +85,10 @@ public class PushTelemetryRequest extends AbstractRequest {
         return OTLP_CONTENT_TYPE;
     }
 
-    public ByteBuffer metricsData() {
+    public ByteBuffer metricsData(int maxDecompressedBytes) {
         CompressionType cType = 
CompressionType.forId(this.data.compressionType());
         return (cType == CompressionType.NONE) ?
-            this.data.metrics() : 
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
+            this.data.metrics() : 
ClientTelemetryUtils.decompress(this.data.metrics(), cType, 
maxDecompressedBytes);
     }
 
     public static PushTelemetryRequest parse(Readable readable, short version) 
{
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index cafa1bcf402..5445e767122 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
 import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.CompressionType;
@@ -52,6 +53,8 @@ public class ClientTelemetryUtils {
 
     public static final Predicate<? super MetricKeyable> SELECTOR_ALL_METRICS 
= k -> true;
 
+    private static final int DECOMPRESS_READ_BUFFER_BYTES = 8 * 1024;
+
     /**
      * Examine the response data and handle different error code accordingly:
      *
@@ -212,13 +215,18 @@ public class ClientTelemetryUtils {
         }
     }
 
-    public static ByteBuffer decompress(ByteBuffer metrics, CompressionType 
compressionType) {
+    public static ByteBuffer decompress(ByteBuffer metrics, CompressionType 
compressionType, int maxDecompressedBytes) {
         Compression compression = Compression.of(compressionType).build();
         try (InputStream in = compression.wrapForInput(metrics, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
             ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
-            byte[] bytes = new byte[metrics.limit() * 2];
+            byte[] bytes = new byte[Math.min(metrics.limit() * 2, 
DECOMPRESS_READ_BUFFER_BYTES)];
             int nRead;
+            int totalRead = 0;
             while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+                totalRead += nRead;
+                if (totalRead > maxDecompressedBytes) {
+                    throw new TelemetryTooLargeException("Decompressed 
telemetry metrics exceed maximum allowed size: " + maxDecompressedBytes);
+                }
                 out.write(bytes, 0, nRead);
             }
             out.buffer().flip();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
index e133fbac3c8..198b8b4340e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
@@ -62,7 +62,7 @@ public class PushTelemetryRequestTest {
         MetricsData metricsData = getMetricsData();
         PushTelemetryRequest req = getPushTelemetryRequest(metricsData, 
compressionType);
 
-        ByteBuffer receivedMetricsBuffer = req.metricsData();
+        ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024);
         assertNotNull(receivedMetricsBuffer);
         assertTrue(receivedMetricsBuffer.capacity() > 0);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 2bc0767dd88..7ca1a41fb4c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.telemetry.internals;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.CompressionType;
 import org.apache.kafka.common.utils.Utils;
@@ -154,7 +155,7 @@ public class ClientTelemetryUtilsTest {
         } else {
             assertArrayEquals(raw, Utils.toArray(compressed));
         }
-        ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+        ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType, 1024 * 1024);
         assertNotNull(decompressed);
         byte[] actualResult = Utils.toArray(decompressed);
         assertArrayEquals(raw, actualResult);
@@ -191,4 +192,31 @@ public class ClientTelemetryUtilsTest {
 
         return builder.build();
     }
+
+    @Test
+    public void testDecompressExceedingMaxSizeThrows() throws IOException {
+        // Compress a large payload using the existing compress API (via 
MetricsData)
+        // then verify decompression with a small limit throws
+        MetricsData metricsData = getMetricsData();
+        ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, 
CompressionType.GZIP);
+        byte[] raw = metricsData.toByteArray();
+
+        // Set limit smaller than the actual decompressed size
+        int smallLimit = raw.length - 1;
+        TelemetryTooLargeException ex = 
assertThrows(TelemetryTooLargeException.class,
+            () -> ClientTelemetryUtils.decompress(compressed.duplicate(), 
CompressionType.GZIP, smallLimit));
+        assertTrue(ex.getMessage().contains("Decompressed telemetry metrics 
exceed maximum allowed size: " + smallLimit));
+    }
+
+    @Test
+    public void testDecompressWithPayloadSizeSucceeds() throws IOException {
+        MetricsData metricsData = getMetricsData();
+        byte[] raw = metricsData.toByteArray();
+        ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, 
CompressionType.GZIP);
+
+        // Set limit to exact limit prior compression.
+        ByteBuffer result = ClientTelemetryUtils.decompress(compressed, 
CompressionType.GZIP, raw.length);
+        assertNotNull(result);
+        assertArrayEquals(raw, Utils.toArray(result));
+    }
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 87f2234da9a..c9a9e35453e 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -214,7 +214,7 @@ public class ClientMetricsManager implements AutoCloseable {
         if (metrics != null && metrics.limit() > 0) {
             try {
                 long exportTimeStartMs = time.hiResClockMs();
-                clientTelemetryExporterPlugin.exportMetrics(requestContext, 
request, clientInstance.pushIntervalMs());
+                clientTelemetryExporterPlugin.exportMetrics(requestContext, 
request, clientInstance.pushIntervalMs(), clientTelemetryMaxBytes);
                 clientMetricsStats.recordPluginExport(clientInstanceId, 
time.hiResClockMs() - exportTimeStartMs);
             } catch (Throwable exception) {
                 clientMetricsStats.recordPluginErrorCount(clientInstanceId);
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
index 0836dd730da..38b0ac5ac47 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
@@ -53,12 +53,12 @@ public class ClientTelemetryExporterPlugin {
         exporters.add(exporter);
     }
 
-    public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest 
request) {
-        return new DefaultClientTelemetryPayload(request);
+    public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest 
request, int maxDecompressedBytes) {
+        return new DefaultClientTelemetryPayload(request, 
maxDecompressedBytes);
     }
 
-    public void exportMetrics(RequestContext context, PushTelemetryRequest 
request, int pushIntervalMs) {
-        DefaultClientTelemetryPayload payload = getPayLoad(request);
+    public void exportMetrics(RequestContext context, PushTelemetryRequest 
request, int pushIntervalMs, int maxDecompressedBytes) {
+        DefaultClientTelemetryPayload payload = getPayLoad(request, 
maxDecompressedBytes);
 
         // Export to deprecated receivers
         for (ClientTelemetryReceiver receiver : receivers) {
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
index f90f473b1ed..0db7fcb6de1 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
@@ -32,11 +32,11 @@ public class DefaultClientTelemetryPayload implements 
ClientTelemetryPayload {
     private final String metricsContentType;
     private final ByteBuffer metricsData;
 
-    DefaultClientTelemetryPayload(PushTelemetryRequest request) {
+    DefaultClientTelemetryPayload(PushTelemetryRequest request, int 
maxDecompressedBytes) {
         this.clientInstanceId = request.data().clientInstanceId();
         this.isClientTerminating = request.data().terminating();
         this.metricsContentType = request.metricsContentType();
-        this.metricsData = request.metricsData();
+        this.metricsData = request.metricsData(maxDecompressedBytes);
     }
 
     @Override
diff --git 
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index 3a5d41a592b..f6420a0edde 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -1079,7 +1079,7 @@ public class ClientMetricsManagerTest {
     @Test
     public void testPushTelemetryPluginException() throws Exception {
         ClientTelemetryExporterPlugin receiverPlugin = 
Mockito.mock(ClientTelemetryExporterPlugin.class);
-        Mockito.doThrow(new RuntimeException("test 
exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any(), 
Mockito.anyInt());
+        Mockito.doThrow(new RuntimeException("test 
exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any(), 
Mockito.anyInt(), Mockito.anyInt());
 
         try (
                 Metrics kafkaMetrics = new Metrics();
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
index 958f21a12a2..f2803883cc6 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
@@ -57,7 +57,7 @@ public class ClientMetricsTelemetryPluginTest {
 
         byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
5000);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
5000, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
         assertEquals(1, telemetryReceiver.metricsData.size());
@@ -77,7 +77,7 @@ public class ClientMetricsTelemetryPluginTest {
         byte[] metrics = "test-metrics-new".getBytes(StandardCharsets.UTF_8);
         int pushIntervalMs = 10000;
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         assertEquals(1, telemetryExporter.exportMetricsInvokedCount);
         assertEquals(1, telemetryExporter.metricsData.size());
@@ -96,7 +96,7 @@ public class ClientMetricsTelemetryPluginTest {
         byte[] metrics = "test-metrics-both".getBytes(StandardCharsets.UTF_8);
         int pushIntervalMs = 15000;
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         // Both should be called since they are separate objects
         assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
@@ -122,7 +122,7 @@ public class ClientMetricsTelemetryPluginTest {
         byte[] metrics = "test-metrics-dual".getBytes(StandardCharsets.UTF_8);
         int pushIntervalMs = 12000;
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         // Only the exporter should be called (receiver should not be invoked)
         assertEquals(0, dualImpl.getReceiver().exportMetricsInvokedCount);
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java
index c6f5ff25563..dcda4b5f19f 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java
@@ -50,7 +50,8 @@ public class ClientTelemetryTest {
 
         byte[] metrics = 
"test-metrics-multiple".getBytes(StandardCharsets.UTF_8);
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
5000);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(),
+            5000, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         // Verify both receivers were called
         assertEquals(1, receiver1.exportMetricsInvokedCount);
@@ -71,7 +72,8 @@ public class ClientTelemetryTest {
         byte[] metrics = 
"test-metrics-multiple-new".getBytes(StandardCharsets.UTF_8);
         int pushIntervalMs = 20000;
         
clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 
pushIntervalMs);
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(),
+            pushIntervalMs, MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
 
         // Verify both exporters were called
         assertEquals(1, exporter1.exportMetricsInvokedCount);

Reply via email to