This is an automated email from the ASF dual-hosted git repository. lqc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f8fa1a8a57 Expose msg length info to metadata (#14688) f8fa1a8a57 is described below commit f8fa1a8a572f7668058bc29de23da586dc40c855 Author: lnbest0707 <106711887+lnbest0707-u...@users.noreply.github.com> AuthorDate: Thu Jan 2 10:47:17 2025 -0800 Expose msg length info to metadata (#14688) * Expose msg length info to metadata * Address comment --- .../main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java | 2 ++ .../java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java index 127ecfe121..35721fcb82 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java @@ -30,6 +30,7 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { public static final String KEY = "__key"; public static final String HEADER_KEY_PREFIX = "__header$"; public static final String METADATA_KEY_PREFIX = "__metadata$"; + public static final String RECORD_SERIALIZED_VALUE_SIZE_KEY = METADATA_KEY_PREFIX + "recordSerializedValueSize"; private final StreamMessageDecoder _valueDecoder; private final GenericRow _reuse = new GenericRow(); @@ -65,6 +66,7 @@ public class StreamDataDecoderImpl implements StreamDataDecoder { if (metadata.getRecordMetadata() != null) { metadata.getRecordMetadata().forEach((key, value) -> row.putValue(METADATA_KEY_PREFIX + key, value)); } + row.putValue(RECORD_SERIALIZED_VALUE_SIZE_KEY, message.getLength()); } return new StreamDataDecoderResult(row, null); } else { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java index f9f6aafc11..a2ddec6d99 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataDecoderImplTest.java @@ -71,11 +71,12 @@ public class StreamDataDecoderImplTest { Assert.assertNotNull(result.getResult()); GenericRow row = result.getResult(); - Assert.assertEquals(row.getFieldToValueMap().size(), 4); + Assert.assertEquals(row.getFieldToValueMap().size(), 5); Assert.assertEquals(row.getValue(NAME_FIELD), value); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.KEY), key, "Failed to decode record key"); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.HEADER_KEY_PREFIX + AGE_HEADER_KEY), 3); Assert.assertEquals(row.getValue(StreamDataDecoderImpl.METADATA_KEY_PREFIX + SEQNO_RECORD_METADATA), "1"); + Assert.assertEquals(row.getValue(StreamDataDecoderImpl.RECORD_SERIALIZED_VALUE_SIZE_KEY), value.length()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org