kishoreg commented on a change in pull request #5430:
URL: https://github.com/apache/incubator-pinot/pull/5430#discussion_r436887390



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -466,19 +469,28 @@ private void processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePipeS
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
         try {
-          GenericRow transformedRow = _recordTransformer.transform(decodedRow);
-
-          if (transformedRow != null) {
-            realtimeRowsConsumedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
-            indexedMessageCount++;
+          List<GenericRow> transformedRows;
+          if 
(decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY) != null) {
+            transformedRows = new ArrayList<>();
+            for (Object singleRow : (Collection) 
decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY)) {
+              transformedRows.add(_recordTransformer.transform((GenericRow) 
singleRow));
+            }
           } else {
-            realtimeRowsDroppedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                    realtimeRowsDroppedMeter);
+            transformedRows = 
Collections.singletonList(_recordTransformer.transform(decodedRow));

Review comment:
       is there a way to avoid creating this list. 99% will not have list.

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -466,19 +469,28 @@ private void processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePipeS
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
         try {
-          GenericRow transformedRow = _recordTransformer.transform(decodedRow);
-
-          if (transformedRow != null) {
-            realtimeRowsConsumedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
-            indexedMessageCount++;
+          List<GenericRow> transformedRows;
+          if 
(decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY) != null) {
+            transformedRows = new ArrayList<>();
+            for (Object singleRow : (Collection) 
decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY)) {
+              transformedRows.add(_recordTransformer.transform((GenericRow) 
singleRow));
+            }
           } else {
-            realtimeRowsDroppedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                    realtimeRowsDroppedMeter);
+            transformedRows = 
Collections.singletonList(_recordTransformer.transform(decodedRow));
           }
 
-          canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+          for (GenericRow transformedRow : transformedRows) {
+            if (transformedRow != null) {

Review comment:
       maybe extract this method into collectRow similar to offline indexing 
and invoke it directly to avoid the penalty of creating a list

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -363,5 +363,7 @@
 
     @Deprecated
     public static final String TABLE_NAME = "segment.table.name";
+
+    public static final String MULTIPLE_RECORDS_KEY = "multiple.records.key";

Review comment:
       put this in GenericRow?
   multiple.records.key -> $MULTIPLE_RECORDS_KEY$ avoid '.'
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to