kishoreg commented on a change in pull request #5430: URL: https://github.com/apache/incubator-pinot/pull/5430#discussion_r436887390
########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -466,19 +469,28 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS messagesAndOffsets.getMessageLengthAtIndex(index), reuse); if (decodedRow != null) { try { - GenericRow transformedRow = _recordTransformer.transform(decodedRow); - - if (transformedRow != null) { - realtimeRowsConsumedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); - indexedMessageCount++; + List<GenericRow> transformedRows; + if (decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY) != null) { + transformedRows = new ArrayList<>(); + for (Object singleRow : (Collection) decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY)) { + transformedRows.add(_recordTransformer.transform((GenericRow) singleRow)); + } } else { - realtimeRowsDroppedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, - realtimeRowsDroppedMeter); + transformedRows = Collections.singletonList(_recordTransformer.transform(decodedRow)); Review comment: is there a way to avoid creating this list. 99% will not have list. ########## File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java ########## @@ -466,19 +469,28 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS messagesAndOffsets.getMessageLengthAtIndex(index), reuse); if (decodedRow != null) { try { - GenericRow transformedRow = _recordTransformer.transform(decodedRow); - - if (transformedRow != null) { - realtimeRowsConsumedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); - indexedMessageCount++; + List<GenericRow> transformedRows; + if (decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY) != null) { + transformedRows = new ArrayList<>(); + for (Object singleRow : (Collection) decodedRow.getValue(CommonConstants.Segment.MULTIPLE_RECORDS_KEY)) { + transformedRows.add(_recordTransformer.transform((GenericRow) singleRow)); + } } else { - realtimeRowsDroppedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, - realtimeRowsDroppedMeter); + transformedRows = Collections.singletonList(_recordTransformer.transform(decodedRow)); } - canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); + for (GenericRow transformedRow : transformedRows) { + if (transformedRow != null) { Review comment: maybe extract this method into collectRow similar to offline indexing and invoke it directly to avoid the penalty of creating a list ########## File path: pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java ########## @@ -363,5 +363,7 @@ @Deprecated public static final String TABLE_NAME = "segment.table.name"; + + public static final String MULTIPLE_RECORDS_KEY = "multiple.records.key"; Review comment: put this in GenericRow? multiple.records.key -> $MULTIPLE_RECORDS_KEY$ avoid '.' ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org