This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 999ecfa7227 [input-format][avro] SimpleAvroMessageDecoder: add
optional 'leading.bytes.to.strip' to support header-prefixed payloads (e.g.,
Confluent magic+schemaId). (#17077)
999ecfa7227 is described below
commit 999ecfa72276b238911b9e8d164dc0314228d468
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Oct 27 19:20:47 2025 -0700
[input-format][avro] SimpleAvroMessageDecoder: add optional
'leading.bytes.to.strip' to support header-prefixed payloads (e.g., Confluent
magic+schemaId). (#17077)
---
.../inputformat/avro/SimpleAvroMessageDecoder.java | 23 +++-
.../avro/SimpleAvroMessageDecoderTest.java | 136 +++++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
index a1673f95977..21cd725f66c 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
@@ -41,12 +41,14 @@ import org.apache.pinot.spi.stream.StreamMessageDecoder;
@NotThreadSafe
public class SimpleAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final String SCHEMA = "schema";
+ private static final String LEADING_BYTES_TO_STRIP =
"leading.bytes.to.strip";
private org.apache.avro.Schema _avroSchema;
private DatumReader<GenericData.Record> _datumReader;
private RecordExtractor<GenericData.Record> _avroRecordExtractor;
private BinaryDecoder _binaryDecoderToReuse;
private GenericData.Record _avroRecordToReuse;
+ private int _leadingBytesToStrip = 0;
@Override
public void init(Map<String, String> props, Set<String> fieldsToRead, String
topicName)
@@ -54,6 +56,18 @@ public class SimpleAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
Preconditions.checkState(props.containsKey(SCHEMA), "Avro schema must be
provided");
_avroSchema = new org.apache.avro.Schema.Parser().parse(props.get(SCHEMA));
_datumReader = new GenericDatumReader<>(_avroSchema);
+
+ // Optional: Strip leading header bytes before decoding (e.g., magic byte
+ schema id)
+ String leadingBytes = props.get(LEADING_BYTES_TO_STRIP);
+ if (leadingBytes != null && !leadingBytes.isEmpty()) {
+ try {
+ _leadingBytesToStrip = Integer.parseInt(leadingBytes);
+ Preconditions.checkState(_leadingBytesToStrip >= 0, "'%s' must be
non-negative", LEADING_BYTES_TO_STRIP);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid integer for '" +
LEADING_BYTES_TO_STRIP + "': " + leadingBytes,
+ e);
+ }
+ }
String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
String recordExtractorConfigClass =
props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
// Backward compatibility to support Avro by default
@@ -87,7 +101,14 @@ public class SimpleAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
*/
@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
- _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload,
offset, length, _binaryDecoderToReuse);
+ int effectiveOffset = offset + _leadingBytesToStrip;
+ int effectiveLength = length - _leadingBytesToStrip;
+ if (effectiveLength < 0) {
+ throw new IllegalArgumentException("Configured '" +
LEADING_BYTES_TO_STRIP + "' (" + _leadingBytesToStrip
+ + ") exceeds available payload length (" + length + ")");
+ }
+ _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload,
effectiveOffset, effectiveLength,
+ _binaryDecoderToReuse);
try {
_avroRecordToReuse = _datumReader.read(_avroRecordToReuse,
_binaryDecoderToReuse);
} catch (Exception e) {
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
b/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
new file mode 100644
index 00000000000..d7428af1917
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class SimpleAvroMessageDecoderTest {
+ private Schema _schema;
+ private byte[] _encodedRecord;
+ private GenericRow _destination;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ String schemaStr = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"TestRecord\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"id\", \"type\": \"int\"},\n"
+ + " {\"name\": \"name\", \"type\": \"string\"}\n"
+ + " ]\n"
+ + "}";
+ _schema = new Schema.Parser().parse(schemaStr);
+
+ GenericData.Record record = new GenericData.Record(_schema);
+ record.put("id", 42);
+ record.put("name", "alice");
+
+ GenericDatumWriter<GenericData.Record> writer = new
GenericDatumWriter<>(_schema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ var encoder = EncoderFactory.get().binaryEncoder(baos, null);
+ writer.write(record, encoder);
+ encoder.flush();
+ _encodedRecord = baos.toByteArray();
+
+ _destination = new GenericRow();
+ }
+
+ private Map<String, String> baseProps() {
+ Map<String, String> props = new HashMap<>();
+ props.put("schema", _schema.toString());
+ return props;
+ }
+
+ @Test
+ public void testDecodeWithoutHeader()
+ throws Exception {
+ SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+ decoder.init(baseProps(), Set.of(), "topic");
+ GenericRow row = decoder.decode(_encodedRecord, _destination);
+ Assert.assertEquals(row.getValue("id"), 42);
+ Assert.assertEquals(row.getValue("name"), "alice");
+ }
+
+ @Test
+ public void testDecodeWithLeadingBytesStripped()
+ throws Exception {
+ byte[] header = new byte[]{1, 2, 3, 4};
+ byte[] payloadWithHeader = new byte[header.length + _encodedRecord.length];
+ System.arraycopy(header, 0, payloadWithHeader, 0, header.length);
+ System.arraycopy(_encodedRecord, 0, payloadWithHeader, header.length,
_encodedRecord.length);
+
+ Map<String, String> props = baseProps();
+ props.put("leading.bytes.to.strip", String.valueOf(header.length));
+
+ SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+ decoder.init(props, Set.of(), "topic");
+ GenericRow row = decoder.decode(payloadWithHeader, _destination);
+ Assert.assertEquals(row.getValue("id"), 42);
+ Assert.assertEquals(row.getValue("name"), "alice");
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testNegativeLeadingBytesRejected()
+ throws Exception {
+ Map<String, String> props = baseProps();
+ props.put("leading.bytes.to.strip", "-1");
+ SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+ decoder.init(props, Set.of(), "topic");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testLeadingBytesExceedsLength()
+ throws Exception {
+ Map<String, String> props = baseProps();
+ props.put("leading.bytes.to.strip", "10");
+ SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+ decoder.init(props, Set.of(), "topic");
+ decoder.decode(new byte[]{0, 1, 2, 3, 4}, new GenericRow());
+ }
+
+ @Test
+ public void testDecodeWithOffsetAndLength()
+ throws Exception {
+ byte[] prefix = new byte[]{9, 9};
+ byte[] suffix = new byte[]{8, 8, 8};
+ byte[] wrapped = new byte[prefix.length + _encodedRecord.length +
suffix.length];
+ System.arraycopy(prefix, 0, wrapped, 0, prefix.length);
+ System.arraycopy(_encodedRecord, 0, wrapped, prefix.length,
_encodedRecord.length);
+ System.arraycopy(suffix, 0, wrapped, prefix.length +
_encodedRecord.length, suffix.length);
+
+ SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+ decoder.init(baseProps(), Set.of(), "topic");
+ GenericRow row = decoder.decode(wrapped, prefix.length,
_encodedRecord.length, _destination);
+ Assert.assertEquals(row.getValue("id"), 42);
+ Assert.assertEquals(row.getValue("name"), "alice");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]