This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3fe4a9b8a4 fix spammy logs for KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest (#9516) 3fe4a9b8a4 is described below commit 3fe4a9b8a49db17efae7716a1d12feee9f2e13a6 Author: Almog Gavra <almog.ga...@gmail.com> AuthorDate: Tue Oct 4 13:35:15 2022 -0700 fix spammy logs for KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest (#9516) --- ...oMessageDecoderRealtimeClusterIntegrationTest.java | 19 +++++++++++++++---- pinot-integration-tests/src/test/resources/log4j2.xml | 5 ----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java index 21627741f1..2e021aacde 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java @@ -75,6 +75,7 @@ public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime"); private static final long RANDOM_SEED = System.currentTimeMillis(); private static final Random RANDOM = new Random(RANDOM_SEED); + private static final int NUM_INVALID_RECORDS = 5; private final boolean _isDirectAlloc = RANDOM.nextBoolean(); private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); @@ -129,29 +130,39 @@ public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegr "io.confluent.kafka.serializers.KafkaAvroSerializer"); Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps); + // this producer produces intentionally malformatted records so that + // we can test the behavior when consuming such records Properties nonAvroProducerProps = new Properties(); nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps); + Producer<byte[], byte[]> invalidDataProducer = new KafkaProducer<>(nonAvroProducerProps); if (injectTombstones()) { // publish lots of tombstones to livelock the consumer if it can't handle this properly for (int i = 0; i < 1000; i++) { // publish a tombstone first - nonAvroProducer.send( + avroProducer.send( new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null)); } } + for (File avroFile : avroFiles) { + int numInvalidRecords = 0; try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { for (GenericRecord genericRecord : reader) { byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis()) : (genericRecord.get(getPartitionColumn())).toString().getBytes(); - // Ignore getKafkaMessageHeader() - nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); + + if (numInvalidRecords < NUM_INVALID_RECORDS) { + // send a few rubbish records to validate that the consumer will skip over non-avro records, but + // don't spam them every time as it causes log spam + invalidDataProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); + numInvalidRecords++; + } + avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, genericRecord)); } } diff --git a/pinot-integration-tests/src/test/resources/log4j2.xml b/pinot-integration-tests/src/test/resources/log4j2.xml index b12865cc65..439331f9d7 100644 --- a/pinot-integration-tests/src/test/resources/log4j2.xml +++ b/pinot-integration-tests/src/test/resources/log4j2.xml @@ -26,11 +26,6 @@ </Console> </Appenders> <Loggers> - <!--Turn off the logger for KafkaConfluentSchemaRegistryAvroMessageDecoder because we intentionally inject - tombstones in KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest which can flood the log - --> - <Logger name="org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder" - level="off" additivity="false"/> <Logger name="org.apache.pinot" level="warn" additivity="false"> <AppenderRef ref="console"/> </Logger> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org