This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bbec0a8070 Handle unknown magic byte error in Confluent Avro decoder 
(#9045) (#9051)
bbec0a8070 is described below

commit bbec0a8070ce1d689a382195b291d68ca945c040
Author: Daniel del Castillo <ddc...@users.noreply.github.com>
AuthorDate: Sun Jul 24 16:49:21 2022 +0100

    Handle unknown magic byte error in Confluent Avro decoder (#9045) (#9051)
    
    * Handle unknown magic byte error in Confluent Avro decoder (#9045)
    
    * Update StreamMessageDecoder documentation
---
 pinot-integration-tests/pom.xml                    |  25 ++
 ...ssageDecoderRealtimeClusterIntegrationTest.java | 313 +++++++++++++++++++++
 .../schemaregistry/SchemaRegistryStarter.java      | 105 +++++++
 ...aConfluentSchemaRegistryAvroMessageDecoder.java |  33 ++-
 .../pinot/spi/stream/StreamMessageDecoder.java     |   9 +-
 5 files changed, 481 insertions(+), 4 deletions(-)

diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index a67a222d5b..3da652cff0 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -37,6 +37,7 @@
     <localstack-utils.version>0.2.19</localstack-utils.version>
     <awaitility.version>3.0.0</awaitility.version>
     <aws.sdk.version>2.14.28</aws.sdk.version>
+    <testcontainers.version>1.17.3</testcontainers.version>
   </properties>
 
   <build>
@@ -323,6 +324,30 @@
       <artifactId>pinot-yammer</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-confluent-avro</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>net.java.dev.jna</groupId>
+          <artifactId>jna</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <version>${testcontainers.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>cloud.localstack</groupId>
       <artifactId>localstack-utils</artifactId>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000000..21627741f1
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.primitives.Longs;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.controller.ControllerConf;
+import 
org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import 
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test that extends RealtimeClusterIntegrationTest but uses 
low-level Kafka consumer.
+ * TODO: Add separate module-level tests and remove the randomness of this test
+ */
+public class 
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest
+    extends RealtimeClusterIntegrationTest {
+  private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+  private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = 
Collections.singletonList("DivActualElapsedTime");
+  private static final long RANDOM_SEED = System.currentTimeMillis();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+
+  private final boolean _isDirectAlloc = RANDOM.nextBoolean();
+  private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+  private final boolean _enableSplitCommit = RANDOM.nextBoolean();
+  private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+  private final long _startTime = System.currentTimeMillis();
+  private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry;
+
+  @Override
+  protected int getNumKafkaBrokers() {
+    return 1;
+  }
+
+  @Override
+  protected void startKafka() {
+    super.startKafka();
+    startSchemaRegistry();
+  }
+
+  @Override
+  protected void stopKafka() {
+    stopSchemaRegistry();
+    super.stopKafka();
+  }
+
+  private void startSchemaRegistry() {
+    if (_schemaRegistry == null) {
+      _schemaRegistry = 
SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.DEFAULT_PORT);
+    }
+  }
+
+  private void stopSchemaRegistry() {
+    try {
+      if (_schemaRegistry != null) {
+        _schemaRegistry.stop();
+        _schemaRegistry = null;
+      }
+    } catch (Exception e) {
+      // Swallow exceptions
+    }
+  }
+
+  @Override
+  protected void pushAvroIntoKafka(List<File> avroFiles)
+      throws Exception {
+    Properties avroProducerProps = new Properties();
+    avroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + getKafkaPort());
+    
avroProducerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
_schemaRegistry.getUrl());
+    avroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    avroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "io.confluent.kafka.serializers.KafkaAvroSerializer");
+    Producer<byte[], GenericRecord> avroProducer = new 
KafkaProducer<>(avroProducerProps);
+
+    Properties nonAvroProducerProps = new Properties();
+    nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + getKafkaPort());
+    nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    Producer<byte[], byte[]> nonAvroProducer = new 
KafkaProducer<>(nonAvroProducerProps);
+
+    if (injectTombstones()) {
+      // publish lots of tombstones to livelock the consumer if it can't 
handle this properly
+      for (int i = 0; i < 1000; i++) {
+        // publish a tombstone first
+        nonAvroProducer.send(
+            new ProducerRecord<>(getKafkaTopic(), 
Longs.toByteArray(System.currentTimeMillis()), null));
+      }
+    }
+    for (File avroFile : avroFiles) {
+      try (DataFileStream<GenericRecord> reader = 
AvroUtils.getAvroReader(avroFile)) {
+        for (GenericRecord genericRecord : reader) {
+          byte[] keyBytes = (getPartitionColumn() == null) ? 
Longs.toByteArray(System.currentTimeMillis())
+              : 
(genericRecord.get(getPartitionColumn())).toString().getBytes();
+          // Ignore getKafkaMessageHeader()
+          nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
"Rubbish".getBytes(UTF_8)));
+          avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
genericRecord));
+        }
+      }
+    }
+  }
+
+  @Override
+  protected Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigMap = super.getStreamConfigs();
+    String streamType = "kafka";
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        KafkaConfluentSchemaRegistryAvroMessageDecoder.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.prop.schema.registry.rest.url", 
_schemaRegistry.getUrl());
+    return streamConfigMap;
+  }
+
+  @Override
+  protected boolean injectTombstones() {
+    return true;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected String getLoadMode() {
+    return ReadMode.mmap.name();
+  }
+
+  @Override
+  public void startController()
+      throws Exception {
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+
+    properties.put(ControllerConf.ALLOW_HLC_TABLES, false);
+    properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit);
+
+    startController(properties);
+    
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration configuration) {
+    
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
 true);
+    
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION,
 _isDirectAlloc);
+    if (_isConsumerDirConfigured) {
+      configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, 
CONSUMER_DIRECTORY);
+    }
+    if (_enableSplitCommit) {
+      
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, 
true);
+      
configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA,
 true);
+    }
+  }
+
+  @Override
+  protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, 
TableConfig tableConfig)
+      throws Exception {
+    if (!_tarDir.exists()) {
+      _tarDir.mkdir();
+    }
+    if (!_segmentDir.exists()) {
+      _segmentDir.mkdir();
+    }
+
+    // create segments out of the avro files (segments will be placed in 
_tarDir)
+    List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, 
tableConfig, schema, 0, _segmentDir, _tarDir);
+
+    // upload segments to controller
+    uploadSegmentsToController(getTableName(), _tarDir, false, false);
+
+    // upload the first segment again to verify refresh
+    uploadSegmentsToController(getTableName(), _tarDir, true, false);
+
+    // upload the first segment again to verify refresh with different segment 
crc
+    uploadSegmentsToController(getTableName(), _tarDir, true, true);
+
+    // add avro files to the original list so H2 will have the uploaded data 
as well
+    avroFiles.addAll(copyOfAvroFiles);
+  }
+
+  private void uploadSegmentsToController(String tableName, File tarDir, 
boolean onlyFirstSegment, boolean changeCrc)
+      throws Exception {
+    File[] segmentTarFiles = tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+    if (onlyFirstSegment) {
+      numSegments = 1;
+    }
+    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles[0];
+        if (changeCrc) {
+          changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
+        }
+        assertEquals(
+            fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(), segmentTarFile,
+                tableName, TableType.REALTIME).getStatusCode(), 
HttpStatus.SC_OK);
+      } else {
+        // Upload segments in parallel
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(
+              () -> 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
+                  segmentTarFile, tableName, 
TableType.REALTIME).getStatusCode()));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
+      }
+    }
+  }
+
+  private void changeCrcInSegmentZKMetadata(String tableName, String 
segmentFilePath) {
+    int startIdx = segmentFilePath.indexOf("mytable_");
+    int endIdx = segmentFilePath.indexOf(".tar.gz");
+    String segmentName = segmentFilePath.substring(startIdx, endIdx);
+    String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    SegmentZKMetadata segmentZKMetadata = 
_helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
+    segmentZKMetadata.setCrc(111L);
+    _helixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // all the data that was ingested from Kafka also got uploaded via the 
controller's upload endpoint
+    return super.getCountStarResult() * 2;
+  }
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    System.out.println(format(
+        "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: 
%s, enableSplitCommit: %s, "
+            + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, 
_isConsumerDirConfigured,
+        _enableSplitCommit, _enableLeadControllerResource));
+
+    // Remove the consumer directory
+    FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+    super.setUp();
+  }
+
+  @AfterClass
+  @Override
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
+    super.tearDown();
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
new file mode 100644
index 0000000000..9d5c0ab53b
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.kafka.schemaregistry;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+
+public class SchemaRegistryStarter {
+  public static final int DEFAULT_PORT = 8081;
+  private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0";
+  private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME =
+      DockerImageName.parse("confluentinc/cp-kafka:" + 
CONFLUENT_PLATFORM_VERSION);
+  private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME =
+      DockerImageName.parse("confluentinc/cp-schema-registry:" + 
CONFLUENT_PLATFORM_VERSION);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaRegistryStarter.class);
+
+  private SchemaRegistryStarter() {
+  }
+
+  public static KafkaSchemaRegistryInstance startLocalInstance(int port) {
+    KafkaSchemaRegistryInstance kafkaSchemaRegistry = new 
KafkaSchemaRegistryInstance(port);
+    kafkaSchemaRegistry.start();
+    return kafkaSchemaRegistry;
+  }
+
+  public static class KafkaSchemaRegistryInstance {
+    private final int _port;
+    public KafkaContainer _kafkaContainer;
+    private Network _network;
+    private GenericContainer _schemaRegistryContainer;
+
+    private KafkaSchemaRegistryInstance(int port) {
+      _port = port;
+    }
+
+    public String getUrl() {
+      return "http://"; + _schemaRegistryContainer.getHost() + ":" + 
_schemaRegistryContainer.getMappedPort(_port);
+    }
+
+    public void start() {
+      LOGGER.info("Starting schema registry");
+      if (_kafkaContainer != null || _schemaRegistryContainer != null) {
+        throw new IllegalStateException("Schema registry is already running");
+      }
+
+      _network = Network.newNetwork();
+
+      _kafkaContainer = new 
KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka")
+          .withCreateContainerCmdModifier(it -> 
it.withHostName("kafka")).waitingFor(Wait.forListeningPort());
+      _kafkaContainer.start();
+
+      Map<String, String> schemaRegistryProps = new HashMap<>();
+      schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", 
"kafka:9092");
+      schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry");
+      schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:"; + 
_port);
+      schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true");
+      _schemaRegistryContainer =
+          new 
GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network)
+              
.withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port)
+              .waitingFor(Wait.forListeningPort());
+      _schemaRegistryContainer.start();
+    }
+
+    public void stop() {
+      LOGGER.info("Stopping schema registry");
+      if (_schemaRegistryContainer != null) {
+        _schemaRegistryContainer.stop();
+        _schemaRegistryContainer = null;
+      }
+
+      if (_kafkaContainer != null) {
+        _kafkaContainer.stop();
+        _kafkaContainer = null;
+      }
+
+      if (_network != null) {
+        _network.close();
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
index 20c7981b44..f666d6401b 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericData.Record;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractorConfig;
@@ -38,6 +39,8 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -48,6 +51,7 @@ import static com.google.common.base.Preconditions.checkState;
  * NOTE: Do not use schema in the implementation, as schema will be removed 
from the params
  */
 public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaConfluentSchemaRegistryAvroMessageDecoder.class);
   private static final String SCHEMA_REGISTRY_REST_URL = 
"schema.registry.rest.url";
   private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry.";
   private KafkaAvroDeserializer _deserializer;
@@ -103,12 +107,37 @@ public class 
KafkaConfluentSchemaRegistryAvroMessageDecoder implements StreamMes
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    Record avroRecord = (Record) _deserializer.deserialize(_topicName, 
payload);
-    return _avroRecordExtractor.extract(avroRecord, destination);
+    try {
+      Record avroRecord = (Record) _deserializer.deserialize(_topicName, 
payload);
+      return _avroRecordExtractor.extract(avroRecord, destination);
+    } catch (RuntimeException e) {
+      ignoreOrRethrowException(e);
+      return null;
+    }
   }
 
   @Override
   public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
     return decode(Arrays.copyOfRange(payload, offset, offset + length), 
destination);
   }
+
+  /**
+   * This method handles specific serialisation exceptions. If the exception 
cannot be ignored the method
+   * re-throws the exception.
+   *
+   * @param e exception to handle
+   */
+  private void ignoreOrRethrowException(RuntimeException e) {
+    if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+      // Do nothing, the message is not an Avro message and can't be decoded
+      LOGGER.error("Caught exception while decoding row in topic {}, 
discarding row", _topicName, e);
+      return;
+    }
+    throw e;
+  }
+
+  private boolean isUnknownMagicByte(Throwable e) {
+    return e != null && e instanceof SerializationException && e.getMessage() 
!= null && e.getMessage().toLowerCase()
+        .contains("unknown magic byte");
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index d2989c8ae8..89312f06b6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
 
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -52,8 +53,10 @@ public interface StreamMessageDecoder<T> {
    * Decodes a row.
    *
    * @param payload The buffer from which to read the row.
-   * @return A new row decoded from the buffer
+   * @return A new row decoded from the buffer. If the returned value is 
<code>null</code> the row is dropped from the
+   *         segment.
    */
+  @Nullable
   GenericRow decode(T payload, GenericRow destination);
 
   /**
@@ -63,7 +66,9 @@ public interface StreamMessageDecoder<T> {
    * @param offset The offset into the array from which the row contents starts
    * @param length The length of the row contents in bytes
    * @param destination The {@link GenericRow} to write the decoded row into
-   * @return A new row decoded from the buffer
+   * @return A new row decoded from the buffer If the returned value is 
<code>null</code> the row is dropped from the
+   *         segment.
    */
+  @Nullable
   GenericRow decode(T payload, int offset, int length, GenericRow destination);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to