ddcprg commented on code in PR #9051: URL: https://github.com/apache/pinot/pull/9051#discussion_r926458882
########## pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java: ########## @@ -103,12 +107,37 @@ public void init(Map<String, String> props, Set<String> fieldsToRead, String top @Override public GenericRow decode(byte[] payload, GenericRow destination) { - Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload); - return _avroRecordExtractor.extract(avroRecord, destination); + try { + Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload); + return _avroRecordExtractor.extract(avroRecord, destination); + } catch (RuntimeException e) { + ignoreOrRethrowException(e); + return null; + } } @Override public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); } + + /** + * This method handles specific serialisation exceptions. If the exception cannot be ignored the method + * re-throws the exception. + * + * @param e exception to handle + */ + private void ignoreOrRethrowException(RuntimeException e) { + if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) { + // Do nothing, the message is not an Avro message and can't be decoded + LOGGER.error("Caught exception while decoding row in topic {}, discarding row", _topicName, e); + return; Review Comment: Actually there is an error counter in LL data manager already,unless we want to get a counter per error type this won't be necessary right now if you agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org