ddcprg commented on code in PR #9051:
URL: https://github.com/apache/pinot/pull/9051#discussion_r926458882


##########
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java:
##########
@@ -103,12 +107,37 @@ public void init(Map<String, String> props, Set<String> 
fieldsToRead, String top
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    Record avroRecord = (Record) _deserializer.deserialize(_topicName, 
payload);
-    return _avroRecordExtractor.extract(avroRecord, destination);
+    try {
+      Record avroRecord = (Record) _deserializer.deserialize(_topicName, 
payload);
+      return _avroRecordExtractor.extract(avroRecord, destination);
+    } catch (RuntimeException e) {
+      ignoreOrRethrowException(e);
+      return null;
+    }
   }
 
   @Override
   public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
     return decode(Arrays.copyOfRange(payload, offset, offset + length), 
destination);
   }
+
+  /**
+   * This method handles specific serialisation exceptions. If the exception 
cannot be ignored the method
+   * re-throws the exception.
+   *
+   * @param e exception to handle
+   */
+  private void ignoreOrRethrowException(RuntimeException e) {
+    if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+      // Do nothing, the message is not an Avro message and can't be decoded
+      LOGGER.error("Caught exception while decoding row in topic {}, 
discarding row", _topicName, e);
+      return;

Review Comment:
   Actually there is an error counter in LL data manager already,unless we want 
to get a counter per error type this won't be necessary right now if you agree 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to