agavra commented on code in PR #9516: URL: https://github.com/apache/pinot/pull/9516#discussion_r986046996
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java: ########## @@ -129,29 +130,39 @@ protected void pushAvroIntoKafka(List<File> avroFiles) "io.confluent.kafka.serializers.KafkaAvroSerializer"); Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps); + // this producer produces intentionally malformatted records so that + // we can test the behavior when consuming such records Properties nonAvroProducerProps = new Properties(); nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps); + Producer<byte[], byte[]> invalidDataProducer = new KafkaProducer<>(nonAvroProducerProps); if (injectTombstones()) { // publish lots of tombstones to livelock the consumer if it can't handle this properly for (int i = 0; i < 1000; i++) { // publish a tombstone first - nonAvroProducer.send( + avroProducer.send( new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null)); } } + for (File avroFile : avroFiles) { + int numInvalidRecords = 0; try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { for (GenericRecord genericRecord : reader) { byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis()) : (genericRecord.get(getPartitionColumn())).toString().getBytes(); - // Ignore getKafkaMessageHeader() - nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); + + if (numInvalidRecords < NUM_INVALID_RECORDS) { + // send a few rubbish records to validate that the consumer will skip over non-avro records, but + // don't spam them every time as it causes log spam Review Comment: I think we should - this will allow us to re-enable the logs (helpful for debugging in the future) and the test should run faster since it's producing way less garbage to kafka -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org