This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f8fa1a8a57 Expose msg length info to metadata (#14688)
f8fa1a8a57 is described below

commit f8fa1a8a572f7668058bc29de23da586dc40c855
Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com>
AuthorDate: Thu Jan 2 10:47:17 2025 -0800

    Expose msg length info to metadata (#14688)
    
    * Expose msg length info to metadata
    
    * Address comment
---
 .../main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java   | 2 ++
 .../java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java    | 3 ++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
index 127ecfe121..35721fcb82 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java
@@ -30,6 +30,7 @@ public class StreamDataDecoderImpl implements 
StreamDataDecoder {
   public static final String KEY = "__key";
   public static final String HEADER_KEY_PREFIX = "__header$";
   public static final String METADATA_KEY_PREFIX = "__metadata$";
+  public static final String RECORD_SERIALIZED_VALUE_SIZE_KEY = 
METADATA_KEY_PREFIX + "recordSerializedValueSize";
 
   private final StreamMessageDecoder _valueDecoder;
   private final GenericRow _reuse = new GenericRow();
@@ -65,6 +66,7 @@ public class StreamDataDecoderImpl implements 
StreamDataDecoder {
           if (metadata.getRecordMetadata() != null) {
             metadata.getRecordMetadata().forEach((key, value) -> 
row.putValue(METADATA_KEY_PREFIX + key, value));
           }
+          row.putValue(RECORD_SERIALIZED_VALUE_SIZE_KEY, message.getLength());
         }
         return new StreamDataDecoderResult(row, null);
       } else {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
index f9f6aafc11..a2ddec6d99 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java
@@ -71,11 +71,12 @@ public class StreamDataDecoderImplTest {
     Assert.assertNotNull(result.getResult());
 
     GenericRow row = result.getResult();
-    Assert.assertEquals(row.getFieldToValueMap().size(), 4);
+    Assert.assertEquals(row.getFieldToValueMap().size(), 5);
     Assert.assertEquals(row.getValue(NAME_FIELD), value);
     Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed 
to decode record key");
     Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + 
AGE_HEADER_KEY), 3);
     Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX 
+ SEQNO_RECORD_METADATA), "1");
+    
Assert.assertEquals(row.getValue(StreamDataDecoderImpl.RECORD_SERIALIZED_VALUE_SIZE_KEY),
 value.length());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to