mcvsubbu commented on code in PR #9224:
URL: https://github.com/apache/pinot/pull/9224#discussion_r979468325


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java:
##########
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.stream.kafka20;
+package org.apache.pinot.spi.stream;
 
-import java.nio.ByteBuffer;
-import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
-import org.apache.pinot.spi.stream.RowMetadata;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
-public class MessageAndOffsetAndMetadata extends MessageAndOffset {
-  private final RowMetadata _rowMetadata;
+/**
+ * A container class for holding the result of a decoder
+ * At any point in time, only one of Result or exception is set as null.
+ */
+public final class StreamDataDecoderResult {
+  private final GenericRow _result;
+  private final Exception _exception;

Review Comment:
   You have an exception declared here. I see that the decode() operation is 
guaranteed not to throw an exception. You may also want to add a comment on the 
decode() interface that it is NOT supposed to throw an exception (just to guard 
against someone changing it).
   
   Also, instead of a generic exception, I suggest that we define some specific 
ones (or at least specific categories) -- retryable or not. See PR #9051 
   



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -39,5 +44,27 @@ public interface RowMetadata {
    * @return timestamp (epoch in milliseconds) when the row was ingested 
upstream
    *         Long.MIN_VALUE if not available
    */
-  long getIngestionTimeMs();
+  long getRecordTimestampMs();

Review Comment:
   What is wrong with `getIngestionTimeMs()`? I think it conveys the semantics 
perfectly well instead of `getRecordTimeStampMs()`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -544,16 +547,16 @@ private boolean processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePi
 
       // Index each message
       reuse.clear();
-      // retrieve metadata from the message batch if available
-      // this can be overridden by the decoder if there is a better indicator 
in the message payload
-      RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
-
-      GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), 
messagesAndOffsets.getMessageOffsetAtIndex(index),
-              messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
-      if (decodedRow != null) {
+      StreamDataDecoderResult decodedRow = 
_streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
+      RowMetadata msgMetadata = 
messagesAndOffsets.getStreamMessage(index).getMetadata();
+      if (decodedRow.getException() != null) {
+        // TODO: handle exception as we do today - do we silently drop the 
record or throw exception?

Review Comment:
   This area is a little wild. There were a couple of PRs before as well, that 
kind of made it even more messy (someone needed decoder exceptions to retry to 
fetch new schema). It will be great if you can add Javadocs as appropriate in 
the decoder interfaces that clearly indicates how the exceptions will be 
treated by this class.
   
   Thanks



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -31,6 +32,8 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
+  GenericRow EMPTY_ROW = new GenericRow();

Review Comment:
   +1, we should find ways of removing rowmetadata with the introduction of 
more generic class. Can we mark it deprecated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to