walterddr commented on code in PR #9516:
URL: https://github.com/apache/pinot/pull/9516#discussion_r985994470


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:
##########
@@ -129,29 +130,39 @@ protected void pushAvroIntoKafka(List<File> avroFiles)
         "io.confluent.kafka.serializers.KafkaAvroSerializer");
     Producer<byte[], GenericRecord> avroProducer = new 
KafkaProducer<>(avroProducerProps);
 
+    // this producer produces intentionally malformatted records so that
+    // we can test the behavior when consuming such records
     Properties nonAvroProducerProps = new Properties();
     nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + getKafkaPort());
     nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
     nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
-    Producer<byte[], byte[]> nonAvroProducer = new 
KafkaProducer<>(nonAvroProducerProps);
+    Producer<byte[], byte[]> invalidDataProducer = new 
KafkaProducer<>(nonAvroProducerProps);
 
     if (injectTombstones()) {
       // publish lots of tombstones to livelock the consumer if it can't 
handle this properly
       for (int i = 0; i < 1000; i++) {
         // publish a tombstone first
-        nonAvroProducer.send(
+        avroProducer.send(
             new ProducerRecord<>(getKafkaTopic(), 
Longs.toByteArray(System.currentTimeMillis()), null));
       }
     }
+
     for (File avroFile : avroFiles) {
+      int numInvalidRecords = 0;
       try (DataFileStream<GenericRecord> reader = 
AvroUtils.getAvroReader(avroFile)) {
         for (GenericRecord genericRecord : reader) {
           byte[] keyBytes = (getPartitionColumn() == null) ? 
Longs.toByteArray(System.currentTimeMillis())
               : 
(genericRecord.get(getPartitionColumn())).toString().getBytes();
-          // Ignore getKafkaMessageHeader()
-          nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
"Rubbish".getBytes(UTF_8)));
+
+          if (numInvalidRecords < NUM_INVALID_RECORDS) {
+            // send a few rubbish records to validate that the consumer will 
skip over non-avro records, but
+            // don't spam them every time as it causes log spam

Review Comment:
   do we still need to change the test behavior since you already suppressed 
them in the log4j2.xml config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to