apoorvmittal10 commented on code in PR #15148:
URL: https://github.com/apache/kafka/pull/15148#discussion_r1445133177


##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -175,16 +182,44 @@ public static boolean 
validateRequiredResourceLabels(Map<String, String> metadat
     }
 
     public static CompressionType 
preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
-        // TODO: Support compression in client telemetry.
+        if (acceptedCompressionTypes != null && 
!acceptedCompressionTypes.isEmpty()) {
+            // Broker is providing the compression types in order of 
preference. Grab the
+            // first one.
+            return acceptedCompressionTypes.get(0);
+        }
         return CompressionType.NONE;
     }
 
     public static ByteBuffer compress(byte[] raw, CompressionType 
compressionType) {
-        // TODO: Support compression in client telemetry.
-        if (compressionType == CompressionType.NONE) {
-            return ByteBuffer.wrap(raw);
-        } else {
-            throw new UnsupportedOperationException("Compression is not 
supported");
+        try {
+            try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
+                try (OutputStream out = 
compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
+                    out.write(raw);
+                    out.flush();
+                }
+                compressedOut.buffer().flip();
+                return ByteBuffer.wrap(Utils.toArray(compressedOut.buffer()));
+            }
+        } catch (IOException e) {
+            throw new KafkaException("Failed to compress metrics data", e);
+        }
+    }
+
+    public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
+        ByteBuffer data = ByteBuffer.wrap(metrics);
+        try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+            ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+
+            byte[] bytes = new byte[data.capacity() * 2];
+            int nRead;
+            while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
+                out.write(bytes, 0, nRead);
+            }
+
+            out.flush();
+            return ByteBuffer.wrap(out.toByteArray());

Review Comment:
   Avoided using below code from Utils as the allocation of ByteBuffer is not 
known and `Utils.readFully` only fills the data till buffer capacity, which 
means we require readjusting ByteBuffer which is similarly done above.
   
   The reason Utils.readFully is being successfully used in MemoryRecords 
because the uncompressed data size comes along the Produce request where buffer 
allocation is accurate. 
   
   ```
   ByteBuffer decompressedData = ByteBuffer.allocate(<some capacity>);
   Utils.readFully(in, decompressedData);
   return (ByteBuffer) decompressedData.flip();
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to