apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445133177
##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -175,16 +182,44 @@ public static boolean
validateRequiredResourceLabels(Map<String, String> metadat
}
public static CompressionType
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
- // TODO: Support compression in client telemetry.
+ if (acceptedCompressionTypes != null &&
!acceptedCompressionTypes.isEmpty()) {
+ // Broker is providing the compression types in order of
preference. Grab the
+ // first one.
+ return acceptedCompressionTypes.get(0);
+ }
return CompressionType.NONE;
}
public static ByteBuffer compress(byte[] raw, CompressionType
compressionType) {
- // TODO: Support compression in client telemetry.
- if (compressionType == CompressionType.NONE) {
- return ByteBuffer.wrap(raw);
- } else {
- throw new UnsupportedOperationException("Compression is not
supported");
+ try {
+ try (ByteBufferOutputStream compressedOut = new
ByteBufferOutputStream(512)) {
+ try (OutputStream out =
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+ out.write(raw);
+ out.flush();
+ }
+ compressedOut.buffer().flip();
+ return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+ }
+ } catch (IOException e) {
+ throw new KafkaException("Failed to compress metrics data", e);
+ }
+ }
+
+ public static ByteBuffer decompress(byte[] metrics, CompressionType
compressionType) {
+ ByteBuffer data = ByteBuffer.wrap(metrics);
+ try (InputStream in = compressionType.wrapForInput(data,
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+ byte[] bytes = new byte[data.capacity() * 2];
+ int nRead;
+ while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+ out.write(bytes, 0, nRead);
+ }
+
+ out.flush();
+ return ByteBuffer.wrap(out.toByteArray());
Review Comment:
Avoided using below code from Utils as the allocation of ByteBuffer is not
known and `Utils.readFully` only fills the data till buffer capacity, which
means we require readjusting ByteBuffer which is similarly done above.
The reason Utils.readFully is being successfully used in MemoryRecords
because the uncompressed data size comes along the Produce request where buffer
allocation is accurate.
```
ByteBuffer decompressedData = ByteBuffer.allocate(<some capacity>);
Utils.readFully(in, decompressedData);
return (ByteBuffer) decompressedData.flip();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]