This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bbec0a8070 Handle unknown magic byte error in Confluent Avro decoder (#9045) (#9051) bbec0a8070 is described below commit bbec0a8070ce1d689a382195b291d68ca945c040 Author: Daniel del Castillo <ddc...@users.noreply.github.com> AuthorDate: Sun Jul 24 16:49:21 2022 +0100 Handle unknown magic byte error in Confluent Avro decoder (#9045) (#9051) * Handle unknown magic byte error in Confluent Avro decoder (#9045) * Update StreamMessageDecoder documentation --- pinot-integration-tests/pom.xml | 25 ++ ...ssageDecoderRealtimeClusterIntegrationTest.java | 313 +++++++++++++++++++++ .../schemaregistry/SchemaRegistryStarter.java | 105 +++++++ ...aConfluentSchemaRegistryAvroMessageDecoder.java | 33 ++- .../pinot/spi/stream/StreamMessageDecoder.java | 9 +- 5 files changed, 481 insertions(+), 4 deletions(-) diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml index a67a222d5b..3da652cff0 100644 --- a/pinot-integration-tests/pom.xml +++ b/pinot-integration-tests/pom.xml @@ -37,6 +37,7 @@ <localstack-utils.version>0.2.19</localstack-utils.version> <awaitility.version>3.0.0</awaitility.version> <aws.sdk.version>2.14.28</aws.sdk.version> + <testcontainers.version>1.17.3</testcontainers.version> </properties> <build> @@ -323,6 +324,30 @@ <artifactId>pinot-yammer</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-confluent-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>cloud.localstack</groupId> <artifactId>localstack-utils</artifactId> diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java new file mode 100644 index 0000000000..21627741f1 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.google.common.primitives.Longs; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; +import org.apache.http.HttpStatus; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter; +import org.apache.pinot.plugin.inputformat.avro.AvroUtils; +import org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer. + * TODO: Add separate module-level tests and remove the randomness of this test + */ +public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest + extends RealtimeClusterIntegrationTest { + private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; + private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = + "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; + private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime"); + private static final long RANDOM_SEED = System.currentTimeMillis(); + private static final Random RANDOM = new Random(RANDOM_SEED); + + private final boolean _isDirectAlloc = RANDOM.nextBoolean(); + private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); + private final boolean _enableSplitCommit = RANDOM.nextBoolean(); + private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); + private final long _startTime = System.currentTimeMillis(); + private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry; + + @Override + protected int getNumKafkaBrokers() { + return 1; + } + + @Override + protected void startKafka() { + super.startKafka(); + startSchemaRegistry(); + } + + @Override + protected void stopKafka() { + stopSchemaRegistry(); + super.stopKafka(); + } + + private void startSchemaRegistry() { + if (_schemaRegistry == null) { + _schemaRegistry = SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.DEFAULT_PORT); + } + } + + private void stopSchemaRegistry() { + try { + if (_schemaRegistry != null) { + _schemaRegistry.stop(); + _schemaRegistry = null; + } + } catch (Exception e) { + // Swallow exceptions + } + } + + @Override + protected void pushAvroIntoKafka(List<File> avroFiles) + throws Exception { + Properties avroProducerProps = new Properties(); + avroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); + avroProducerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, _schemaRegistry.getUrl()); + avroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + avroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "io.confluent.kafka.serializers.KafkaAvroSerializer"); + Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps); + + Properties nonAvroProducerProps = new Properties(); + nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); + nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps); + + if (injectTombstones()) { + // publish lots of tombstones to livelock the consumer if it can't handle this properly + for (int i = 0; i < 1000; i++) { + // publish a tombstone first + nonAvroProducer.send( + new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null)); + } + } + for (File avroFile : avroFiles) { + try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { + for (GenericRecord genericRecord : reader) { + byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis()) + : (genericRecord.get(getPartitionColumn())).toString().getBytes(); + // Ignore getKafkaMessageHeader() + nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); + avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, genericRecord)); + } + } + } + } + + @Override + protected Map<String, String> getStreamConfigs() { + Map<String, String> streamConfigMap = super.getStreamConfigs(); + String streamType = "kafka"; + streamConfigMap.put( + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), + KafkaConfluentSchemaRegistryAvroMessageDecoder.class.getName()); + streamConfigMap.put("stream.kafka.decoder.prop.schema.registry.rest.url", _schemaRegistry.getUrl()); + return streamConfigMap; + } + + @Override + protected boolean injectTombstones() { + return true; + } + + @Override + protected boolean useLlc() { + return true; + } + + @Override + protected String getLoadMode() { + return ReadMode.mmap.name(); + } + + @Override + public void startController() + throws Exception { + Map<String, Object> properties = getDefaultControllerConfiguration(); + + properties.put(ControllerConf.ALLOW_HLC_TABLES, false); + properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit); + + startController(properties); + enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); + } + + @Override + protected void overrideServerConf(PinotConfiguration configuration) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); + if (_isConsumerDirConfigured) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); + } + if (_enableSplitCommit) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); + } + } + + @Override + protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig) + throws Exception { + if (!_tarDir.exists()) { + _tarDir.mkdir(); + } + if (!_segmentDir.exists()) { + _segmentDir.mkdir(); + } + + // create segments out of the avro files (segments will be placed in _tarDir) + List<File> copyOfAvroFiles = new ArrayList<>(avroFiles); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + + // upload segments to controller + uploadSegmentsToController(getTableName(), _tarDir, false, false); + + // upload the first segment again to verify refresh + uploadSegmentsToController(getTableName(), _tarDir, true, false); + + // upload the first segment again to verify refresh with different segment crc + uploadSegmentsToController(getTableName(), _tarDir, true, true); + + // add avro files to the original list so H2 will have the uploaded data as well + avroFiles.addAll(copyOfAvroFiles); + } + + private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc) + throws Exception { + File[] segmentTarFiles = tarDir.listFiles(); + assertNotNull(segmentTarFiles); + int numSegments = segmentTarFiles.length; + assertTrue(numSegments > 0); + if (onlyFirstSegment) { + numSegments = 1; + } + URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); + try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { + if (numSegments == 1) { + File segmentTarFile = segmentTarFiles[0]; + if (changeCrc) { + changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString()); + } + assertEquals( + fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, + tableName, TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); + } else { + // Upload segments in parallel + ExecutorService executorService = Executors.newFixedThreadPool(numSegments); + List<Future<Integer>> futures = new ArrayList<>(numSegments); + for (File segmentTarFile : segmentTarFiles) { + futures.add(executorService.submit( + () -> fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), + segmentTarFile, tableName, TableType.REALTIME).getStatusCode())); + } + executorService.shutdown(); + for (Future<Integer> future : futures) { + assertEquals((int) future.get(), HttpStatus.SC_OK); + } + } + } + } + + private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) { + int startIdx = segmentFilePath.indexOf("mytable_"); + int endIdx = segmentFilePath.indexOf(".tar.gz"); + String segmentName = segmentFilePath.substring(startIdx, endIdx); + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); + segmentZKMetadata.setCrc(111L); + _helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata); + } + + @Override + protected long getCountStarResult() { + // all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint + return super.getCountStarResult() * 2; + } + + @BeforeClass + @Override + public void setUp() + throws Exception { + System.out.println(format( + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, " + + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, + _enableSplitCommit, _enableLeadControllerResource)); + + // Remove the consumer directory + FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY)); + + super.setUp(); + } + + @AfterClass + @Override + public void tearDown() + throws Exception { + FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY)); + super.tearDown(); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java new file mode 100644 index 0000000000..9d5c0ab53b --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/kafka/schemaregistry/SchemaRegistryStarter.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.kafka.schemaregistry; + +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + + +public class SchemaRegistryStarter { + public static final int DEFAULT_PORT = 8081; + private static final String CONFLUENT_PLATFORM_VERSION = "7.2.0"; + private static final DockerImageName KAFKA_DOCKER_IMAGE_NAME = + DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION); + private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE_NAME = + DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION); + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryStarter.class); + + private SchemaRegistryStarter() { + } + + public static KafkaSchemaRegistryInstance startLocalInstance(int port) { + KafkaSchemaRegistryInstance kafkaSchemaRegistry = new KafkaSchemaRegistryInstance(port); + kafkaSchemaRegistry.start(); + return kafkaSchemaRegistry; + } + + public static class KafkaSchemaRegistryInstance { + private final int _port; + public KafkaContainer _kafkaContainer; + private Network _network; + private GenericContainer _schemaRegistryContainer; + + private KafkaSchemaRegistryInstance(int port) { + _port = port; + } + + public String getUrl() { + return "http://" + _schemaRegistryContainer.getHost() + ":" + _schemaRegistryContainer.getMappedPort(_port); + } + + public void start() { + LOGGER.info("Starting schema registry"); + if (_kafkaContainer != null || _schemaRegistryContainer != null) { + throw new IllegalStateException("Schema registry is already running"); + } + + _network = Network.newNetwork(); + + _kafkaContainer = new KafkaContainer(KAFKA_DOCKER_IMAGE_NAME).withNetwork(_network).withNetworkAliases("kafka") + .withCreateContainerCmdModifier(it -> it.withHostName("kafka")).waitingFor(Wait.forListeningPort()); + _kafkaContainer.start(); + + Map<String, String> schemaRegistryProps = new HashMap<>(); + schemaRegistryProps.put("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9092"); + schemaRegistryProps.put("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry"); + schemaRegistryProps.put("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + _port); + schemaRegistryProps.put("SCHEMA_REGISTRY_DEBUG", "true"); + _schemaRegistryContainer = + new GenericContainer(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME).dependsOn(_kafkaContainer).withNetwork(_network) + .withNetworkAliases("schemaregistry").withEnv(schemaRegistryProps).withExposedPorts(_port) + .waitingFor(Wait.forListeningPort()); + _schemaRegistryContainer.start(); + } + + public void stop() { + LOGGER.info("Stopping schema registry"); + if (_schemaRegistryContainer != null) { + _schemaRegistryContainer.stop(); + _schemaRegistryContainer = null; + } + + if (_kafkaContainer != null) { + _kafkaContainer.stop(); + _kafkaContainer = null; + } + + if (_network != null) { + _network.close(); + } + } + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java index 20c7981b44..f666d6401b 100644 --- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java +++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java @@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractorConfig; @@ -38,6 +39,8 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkState; @@ -48,6 +51,7 @@ import static com.google.common.base.Preconditions.checkState; * NOTE: Do not use schema in the implementation, as schema will be removed from the params */ public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements StreamMessageDecoder<byte[]> { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConfluentSchemaRegistryAvroMessageDecoder.class); private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url"; private static final String SCHEMA_REGISTRY_OPTS_PREFIX = "schema.registry."; private KafkaAvroDeserializer _deserializer; @@ -103,12 +107,37 @@ public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements StreamMes @Override public GenericRow decode(byte[] payload, GenericRow destination) { - Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload); - return _avroRecordExtractor.extract(avroRecord, destination); + try { + Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload); + return _avroRecordExtractor.extract(avroRecord, destination); + } catch (RuntimeException e) { + ignoreOrRethrowException(e); + return null; + } } @Override public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) { return decode(Arrays.copyOfRange(payload, offset, offset + length), destination); } + + /** + * This method handles specific serialisation exceptions. If the exception cannot be ignored the method + * re-throws the exception. + * + * @param e exception to handle + */ + private void ignoreOrRethrowException(RuntimeException e) { + if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) { + // Do nothing, the message is not an Avro message and can't be decoded + LOGGER.error("Caught exception while decoding row in topic {}, discarding row", _topicName, e); + return; + } + throw e; + } + + private boolean isUnknownMagicByte(Throwable e) { + return e != null && e instanceof SerializationException && e.getMessage() != null && e.getMessage().toLowerCase() + .contains("unknown magic byte"); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java index d2989c8ae8..89312f06b6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; import org.apache.pinot.spi.data.readers.GenericRow; @@ -52,8 +53,10 @@ public interface StreamMessageDecoder<T> { * Decodes a row. * * @param payload The buffer from which to read the row. - * @return A new row decoded from the buffer + * @return A new row decoded from the buffer. If the returned value is <code>null</code> the row is dropped from the + * segment. */ + @Nullable GenericRow decode(T payload, GenericRow destination); /** @@ -63,7 +66,9 @@ public interface StreamMessageDecoder<T> { * @param offset The offset into the array from which the row contents starts * @param length The length of the row contents in bytes * @param destination The {@link GenericRow} to write the decoded row into - * @return A new row decoded from the buffer + * @return A new row decoded from the buffer If the returned value is <code>null</code> the row is dropped from the + * segment. */ + @Nullable GenericRow decode(T payload, int offset, int length, GenericRow destination); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org