This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new dd43f5f3eec MINOR: Fixed metrics decompression (#22327)
dd43f5f3eec is described below
commit dd43f5f3eecaf41b55030cac40b12e7176fb524d
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue May 19 18:34:06 2026 +0100
MINOR: Fixed metrics decompression (#22327)
Minor fix for fixing decompression.
Reviewers: Andrew Schofield <[email protected]>
---
.../common/requests/PushTelemetryRequest.java | 4 +--
.../telemetry/internals/ClientTelemetryUtils.java | 12 +++++++--
.../common/requests/PushTelemetryRequestTest.java | 2 +-
.../internals/ClientTelemetryUtilsTest.java | 30 +++++++++++++++++++++-
.../apache/kafka/server/ClientMetricsManager.java | 2 +-
.../metrics/ClientMetricsReceiverPlugin.java | 8 +++---
.../metrics/DefaultClientTelemetryPayload.java | 4 +--
.../kafka/server/ClientMetricsManagerTest.java | 2 +-
.../metrics/ClientMetricsReceiverPluginTest.java | 2 +-
9 files changed, 51 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
index d58730e5b3f..94e60bd905a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
@@ -85,10 +85,10 @@ public class PushTelemetryRequest extends AbstractRequest {
return OTLP_CONTENT_TYPE;
}
- public ByteBuffer metricsData() {
+ public ByteBuffer metricsData(int maxDecompressedBytes) {
CompressionType cType =
CompressionType.forId(this.data.compressionType());
return (cType == CompressionType.NONE) ?
- this.data.metrics() :
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
+ this.data.metrics() :
ClientTelemetryUtils.decompress(this.data.metrics(), cType,
maxDecompressedBytes);
}
public static PushTelemetryRequest parse(Readable readable, short version)
{
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 3c555afb3b0..a60caefcefe 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
@@ -51,6 +52,8 @@ public class ClientTelemetryUtils {
public static final Predicate<? super MetricKeyable> SELECTOR_ALL_METRICS
= k -> true;
+ private static final int DECOMPRESS_READ_BUFFER_BYTES = 8 * 1024;
+
/**
* Examine the response data and handle different error code accordingly:
*
@@ -201,13 +204,18 @@ public class ClientTelemetryUtils {
}
}
- public static ByteBuffer decompress(ByteBuffer metrics, CompressionType
compressionType) {
+ public static ByteBuffer decompress(ByteBuffer metrics, CompressionType
compressionType, int maxDecompressedBytes) {
Compression compression = Compression.of(compressionType).build();
try (InputStream in = compression.wrapForInput(metrics,
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
- byte[] bytes = new byte[metrics.limit() * 2];
+ byte[] bytes = new byte[Math.min(metrics.limit() * 2,
DECOMPRESS_READ_BUFFER_BYTES)];
int nRead;
+ int totalRead = 0;
while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+ totalRead += nRead;
+ if (totalRead > maxDecompressedBytes) {
+ throw new TelemetryTooLargeException("Decompressed
telemetry metrics exceed maximum allowed size: " + maxDecompressedBytes);
+ }
out.write(bytes, 0, nRead);
}
out.buffer().flip();
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
index beed3e49102..3a9e97420ba 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
@@ -62,7 +62,7 @@ public class PushTelemetryRequestTest {
MetricsData metricsData = getMetricsData();
PushTelemetryRequest req = getPushTelemetryRequest(metricsData,
compressionType);
- ByteBuffer receivedMetricsBuffer = req.metricsData();
+ ByteBuffer receivedMetricsBuffer = req.metricsData(1024 * 1024);
assertNotNull(receivedMetricsBuffer);
assertTrue(receivedMetricsBuffer.capacity() > 0);
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 41679bed3f7..66dfd2e3916 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
@@ -141,7 +142,7 @@ public class ClientTelemetryUtilsTest {
} else {
assertArrayEquals(raw, Utils.toArray(compressed));
}
- ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed,
compressionType);
+ ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed,
compressionType, 1024 * 1024);
assertNotNull(decompressed);
byte[] actualResult = Utils.toArray(decompressed);
assertArrayEquals(raw, actualResult);
@@ -178,4 +179,31 @@ public class ClientTelemetryUtilsTest {
return builder.build();
}
+
+ @Test
+ public void testDecompressExceedingMaxSizeThrows() throws IOException {
+ // Compress a large payload using the existing compress API (via
MetricsData)
+ // then verify decompression with a small limit throws
+ MetricsData metricsData = getMetricsData();
+ ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData,
CompressionType.GZIP);
+ byte[] raw = metricsData.toByteArray();
+
+ // Set limit smaller than the actual decompressed size
+ int smallLimit = raw.length - 1;
+ TelemetryTooLargeException ex =
assertThrows(TelemetryTooLargeException.class,
+ () -> ClientTelemetryUtils.decompress(compressed.duplicate(),
CompressionType.GZIP, smallLimit));
+ assertTrue(ex.getMessage().contains("Decompressed telemetry metrics
exceed maximum allowed size: " + smallLimit));
+ }
+
+ @Test
+ public void testDecompressWithPayloadSizeSucceeds() throws IOException {
+ MetricsData metricsData = getMetricsData();
+ byte[] raw = metricsData.toByteArray();
+ ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData,
CompressionType.GZIP);
+
+ // Set limit to exact limit prior compression.
+ ByteBuffer result = ClientTelemetryUtils.decompress(compressed,
CompressionType.GZIP, raw.length);
+ assertNotNull(result);
+ assertArrayEquals(raw, Utils.toArray(result));
+ }
}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index d7343098b4e..ddc96b22132 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -214,7 +214,7 @@ public class ClientMetricsManager implements AutoCloseable {
if (metrics != null && metrics.limit() > 0) {
try {
long exportTimeStartMs = time.hiResClockMs();
- receiverPlugin.exportMetrics(requestContext, request);
+ receiverPlugin.exportMetrics(requestContext, request,
clientTelemetryMaxBytes);
clientMetricsStats.recordPluginExport(clientInstanceId,
time.hiResClockMs() - exportTimeStartMs);
} catch (Throwable exception) {
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
index c608510350a..137cff0c1b3 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
@@ -44,12 +44,12 @@ public class ClientMetricsReceiverPlugin {
receivers.add(receiver);
}
- public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest
request) {
- return new DefaultClientTelemetryPayload(request);
+ public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest
request, int maxDecompressedBytes) {
+ return new DefaultClientTelemetryPayload(request,
maxDecompressedBytes);
}
- public void exportMetrics(RequestContext context, PushTelemetryRequest
request) {
- DefaultClientTelemetryPayload payload = getPayLoad(request);
+ public void exportMetrics(RequestContext context, PushTelemetryRequest
request, int maxDecompressedBytes) {
+ DefaultClientTelemetryPayload payload = getPayLoad(request,
maxDecompressedBytes);
for (ClientTelemetryReceiver receiver : receivers) {
receiver.exportMetrics(context, payload);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
index f90f473b1ed..0db7fcb6de1 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryPayload.java
@@ -32,11 +32,11 @@ public class DefaultClientTelemetryPayload implements
ClientTelemetryPayload {
private final String metricsContentType;
private final ByteBuffer metricsData;
- DefaultClientTelemetryPayload(PushTelemetryRequest request) {
+ DefaultClientTelemetryPayload(PushTelemetryRequest request, int
maxDecompressedBytes) {
this.clientInstanceId = request.data().clientInstanceId();
this.isClientTerminating = request.data().terminating();
this.metricsContentType = request.metricsContentType();
- this.metricsData = request.metricsData();
+ this.metricsData = request.metricsData(maxDecompressedBytes);
}
@Override
diff --git
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index c5b13eefbe4..1d45b03117c 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -1079,7 +1079,7 @@ public class ClientMetricsManagerTest {
@Test
public void testPushTelemetryPluginException() throws Exception {
ClientMetricsReceiverPlugin receiverPlugin =
Mockito.mock(ClientMetricsReceiverPlugin.class);
- Mockito.doThrow(new RuntimeException("test
exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any());
+ Mockito.doThrow(new RuntimeException("test
exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any(),
Mockito.anyInt());
try (
Metrics kafkaMetrics = new Metrics();
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
index 698e8720954..73f59eda511 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
@@ -54,7 +54,7 @@ public class ClientMetricsReceiverPluginTest {
byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
clientMetricsReceiverPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
- new PushTelemetryRequest.Builder(new
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build());
+ new PushTelemetryRequest.Builder(new
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(),
MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_DEFAULT);
assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
assertEquals(1, telemetryReceiver.metricsData.size());