This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe4a9b8a4 fix spammy logs for 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest 
(#9516)
3fe4a9b8a4 is described below

commit 3fe4a9b8a49db17efae7716a1d12feee9f2e13a6
Author: Almog Gavra <almog.ga...@gmail.com>
AuthorDate: Tue Oct 4 13:35:15 2022 -0700

    fix spammy logs for 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest 
(#9516)
---
 ...oMessageDecoderRealtimeClusterIntegrationTest.java | 19 +++++++++++++++----
 pinot-integration-tests/src/test/resources/log4j2.xml |  5 -----
 2 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
index 21627741f1..2e021aacde 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -75,6 +75,7 @@ public class 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
   private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = 
Collections.singletonList("DivActualElapsedTime");
   private static final long RANDOM_SEED = System.currentTimeMillis();
   private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final int NUM_INVALID_RECORDS = 5;
 
   private final boolean _isDirectAlloc = RANDOM.nextBoolean();
   private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
@@ -129,29 +130,39 @@ public class 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr
         "io.confluent.kafka.serializers.KafkaAvroSerializer");
     Producer<byte[], GenericRecord> avroProducer = new 
KafkaProducer<>(avroProducerProps);
 
+    // this producer produces intentionally malformatted records so that
+    // we can test the behavior when consuming such records
     Properties nonAvroProducerProps = new Properties();
     nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + getKafkaPort());
     nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
     nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.ByteArraySerializer");
-    Producer<byte[], byte[]> nonAvroProducer = new 
KafkaProducer<>(nonAvroProducerProps);
+    Producer<byte[], byte[]> invalidDataProducer = new 
KafkaProducer<>(nonAvroProducerProps);
 
     if (injectTombstones()) {
       // publish lots of tombstones to livelock the consumer if it can't 
handle this properly
       for (int i = 0; i < 1000; i++) {
         // publish a tombstone first
-        nonAvroProducer.send(
+        avroProducer.send(
             new ProducerRecord<>(getKafkaTopic(), 
Longs.toByteArray(System.currentTimeMillis()), null));
       }
     }
+
     for (File avroFile : avroFiles) {
+      int numInvalidRecords = 0;
       try (DataFileStream<GenericRecord> reader = 
AvroUtils.getAvroReader(avroFile)) {
         for (GenericRecord genericRecord : reader) {
           byte[] keyBytes = (getPartitionColumn() == null) ? 
Longs.toByteArray(System.currentTimeMillis())
               : 
(genericRecord.get(getPartitionColumn())).toString().getBytes();
-          // Ignore getKafkaMessageHeader()
-          nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
"Rubbish".getBytes(UTF_8)));
+
+          if (numInvalidRecords < NUM_INVALID_RECORDS) {
+            // send a few rubbish records to validate that the consumer will 
skip over non-avro records, but
+            // don't spam them every time as it causes log spam
+            invalidDataProducer.send(new ProducerRecord<>(getKafkaTopic(), 
keyBytes, "Rubbish".getBytes(UTF_8)));
+            numInvalidRecords++;
+          }
+
           avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
genericRecord));
         }
       }
diff --git a/pinot-integration-tests/src/test/resources/log4j2.xml 
b/pinot-integration-tests/src/test/resources/log4j2.xml
index b12865cc65..439331f9d7 100644
--- a/pinot-integration-tests/src/test/resources/log4j2.xml
+++ b/pinot-integration-tests/src/test/resources/log4j2.xml
@@ -26,11 +26,6 @@
     </Console>
   </Appenders>
   <Loggers>
-    <!--Turn off the logger for KafkaConfluentSchemaRegistryAvroMessageDecoder 
because we intentionally inject
-     tombstones in 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest 
which can flood the log
-     -->
-    <Logger 
name="org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder"
-            level="off" additivity="false"/>
     <Logger name="org.apache.pinot" level="warn" additivity="false">
       <AppenderRef ref="console"/>
     </Logger>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to