jackluo923 commented on code in PR #14496:
URL: https://github.com/apache/pinot/pull/14496#discussion_r1850974494


##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -150,8 +150,8 @@ private BytesStreamMessage extractStreamMessage(Record 
record, String shardId) {
     String sequenceNumber = record.sequenceNumber();
     KinesisPartitionGroupOffset offset = new 
KinesisPartitionGroupOffset(shardId, sequenceNumber);
     // NOTE: Use the same offset as next offset because the consumer starts 
consuming AFTER the start sequence number.
-    StreamMessageMetadata.Builder builder =
-        new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setOffset(offset,
 offset);
+    StreamMessageMetadata.Builder builder = new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
+        
.setSerializedValueSize(record.data().asByteArray().length).setOffset(offset, 
offset);

Review Comment:
   `record.data().asByteArray().length` does not require additional null 
checks. 
   
   Several lines above, there's this line of source code: 
   ```
   byte[] value = record.data().asByteArray();
   ```
   
   I have modified the change to 
   ```
   .setSerializedValueSize(value.length)
   ```



##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java:
##########
@@ -84,7 +84,7 @@ static StreamMessageMetadata 
extractMessageMetadata(Message<byte[]> message, Pul
     MessageIdStreamOffset nextOffset = new 
MessageIdStreamOffset(getNextMessageId(messageId));
     StreamMessageMetadata.Builder builder =
         new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs)
-            .setOffset(offset, nextOffset);
+            .setOffset(offset, 
nextOffset).setSerializedValueSize(message.size());

Review Comment:
   The `message` variable is definetly not `null` because it is used several 
times because it is called in the code change.
   ```
     @VisibleForTesting
     static StreamMessageMetadata extractMessageMetadata(Message<byte[]> 
message, PulsarConfig config) {
       long recordIngestionTimeMs = 
message.getBrokerPublishTime().orElse(message.getPublishTime());
       MessageId messageId = message.getMessageId();
       MessageIdStreamOffset offset = new MessageIdStreamOffset(messageId);
       MessageIdStreamOffset nextOffset = new 
MessageIdStreamOffset(getNextMessageId(messageId));
       StreamMessageMetadata.Builder builder =
           new 
StreamMessageMetadata.Builder().setRecordIngestionTimeMs(recordIngestionTimeMs)
               .setOffset(offset, 
nextOffset).setSerializedValueSize(message.size());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to