walterddr commented on code in PR #9516: URL: https://github.com/apache/pinot/pull/9516#discussion_r985994470
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java: ########## @@ -129,29 +130,39 @@ protected void pushAvroIntoKafka(List<File> avroFiles) "io.confluent.kafka.serializers.KafkaAvroSerializer"); Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps); + // this producer produces intentionally malformatted records so that + // we can test the behavior when consuming such records Properties nonAvroProducerProps = new Properties(); nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps); + Producer<byte[], byte[]> invalidDataProducer = new KafkaProducer<>(nonAvroProducerProps); if (injectTombstones()) { // publish lots of tombstones to livelock the consumer if it can't handle this properly for (int i = 0; i < 1000; i++) { // publish a tombstone first - nonAvroProducer.send( + avroProducer.send( new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null)); } } + for (File avroFile : avroFiles) { + int numInvalidRecords = 0; try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { for (GenericRecord genericRecord : reader) { byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis()) : (genericRecord.get(getPartitionColumn())).toString().getBytes(); - // Ignore getKafkaMessageHeader() - nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); + + if (numInvalidRecords < NUM_INVALID_RECORDS) { + // send a few rubbish records to validate that the consumer will skip over non-avro records, but + // don't spam them every time as it causes log spam Review Comment: do we still need to change the test behavior since you already suppressed them in the log4j2.xml config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org