This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 999ecfa7227 [input-format][avro] SimpleAvroMessageDecoder: add 
optional 'leading.bytes.to.strip' to support header-prefixed payloads (e.g., 
Confluent magic+schemaId). (#17077)
999ecfa7227 is described below

commit 999ecfa72276b238911b9e8d164dc0314228d468
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Oct 27 19:20:47 2025 -0700

    [input-format][avro] SimpleAvroMessageDecoder: add optional 
'leading.bytes.to.strip' to support header-prefixed payloads (e.g., Confluent 
magic+schemaId). (#17077)
---
 .../inputformat/avro/SimpleAvroMessageDecoder.java |  23 +++-
 .../avro/SimpleAvroMessageDecoderTest.java         | 136 +++++++++++++++++++++
 2 files changed, 158 insertions(+), 1 deletion(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
index a1673f95977..21cd725f66c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
@@ -41,12 +41,14 @@ import org.apache.pinot.spi.stream.StreamMessageDecoder;
 @NotThreadSafe
 public class SimpleAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
   private static final String SCHEMA = "schema";
+  private static final String LEADING_BYTES_TO_STRIP = 
"leading.bytes.to.strip";
 
   private org.apache.avro.Schema _avroSchema;
   private DatumReader<GenericData.Record> _datumReader;
   private RecordExtractor<GenericData.Record> _avroRecordExtractor;
   private BinaryDecoder _binaryDecoderToReuse;
   private GenericData.Record _avroRecordToReuse;
+  private int _leadingBytesToStrip = 0;
 
   @Override
   public void init(Map<String, String> props, Set<String> fieldsToRead, String 
topicName)
@@ -54,6 +56,18 @@ public class SimpleAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
     Preconditions.checkState(props.containsKey(SCHEMA), "Avro schema must be 
provided");
     _avroSchema = new org.apache.avro.Schema.Parser().parse(props.get(SCHEMA));
     _datumReader = new GenericDatumReader<>(_avroSchema);
+
+    // Optional: Strip leading header bytes before decoding (e.g., magic byte 
+ schema id)
+    String leadingBytes = props.get(LEADING_BYTES_TO_STRIP);
+    if (leadingBytes != null && !leadingBytes.isEmpty()) {
+      try {
+        _leadingBytesToStrip = Integer.parseInt(leadingBytes);
+        Preconditions.checkState(_leadingBytesToStrip >= 0, "'%s' must be 
non-negative", LEADING_BYTES_TO_STRIP);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Invalid integer for '" + 
LEADING_BYTES_TO_STRIP + "': " + leadingBytes,
+            e);
+      }
+    }
     String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
     String recordExtractorConfigClass = 
props.get(RECORD_EXTRACTOR_CONFIG_CONFIG_KEY);
     // Backward compatibility to support Avro by default
@@ -87,7 +101,14 @@ public class SimpleAvroMessageDecoder implements 
StreamMessageDecoder<byte[]> {
    */
   @Override
   public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
-    _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload, 
offset, length, _binaryDecoderToReuse);
+    int effectiveOffset = offset + _leadingBytesToStrip;
+    int effectiveLength = length - _leadingBytesToStrip;
+    if (effectiveLength < 0) {
+      throw new IllegalArgumentException("Configured '" + 
LEADING_BYTES_TO_STRIP + "' (" + _leadingBytesToStrip
+          + ") exceeds available payload length (" + length + ")");
+    }
+    _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload, 
effectiveOffset, effectiveLength,
+        _binaryDecoderToReuse);
     try {
       _avroRecordToReuse = _datumReader.read(_avroRecordToReuse, 
_binaryDecoderToReuse);
     } catch (Exception e) {
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
new file mode 100644
index 00000000000..d7428af1917
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro/src/test/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoderTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class SimpleAvroMessageDecoderTest {
+  private Schema _schema;
+  private byte[] _encodedRecord;
+  private GenericRow _destination;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    String schemaStr = "{\n"
+        + "  \"type\": \"record\",\n"
+        + "  \"name\": \"TestRecord\",\n"
+        + "  \"fields\": [\n"
+        + "    {\"name\": \"id\", \"type\": \"int\"},\n"
+        + "    {\"name\": \"name\", \"type\": \"string\"}\n"
+        + "  ]\n"
+        + "}";
+    _schema = new Schema.Parser().parse(schemaStr);
+
+    GenericData.Record record = new GenericData.Record(_schema);
+    record.put("id", 42);
+    record.put("name", "alice");
+
+    GenericDatumWriter<GenericData.Record> writer = new 
GenericDatumWriter<>(_schema);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    var encoder = EncoderFactory.get().binaryEncoder(baos, null);
+    writer.write(record, encoder);
+    encoder.flush();
+    _encodedRecord = baos.toByteArray();
+
+    _destination = new GenericRow();
+  }
+
+  private Map<String, String> baseProps() {
+    Map<String, String> props = new HashMap<>();
+    props.put("schema", _schema.toString());
+    return props;
+  }
+
+  @Test
+  public void testDecodeWithoutHeader()
+      throws Exception {
+    SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+    decoder.init(baseProps(), Set.of(), "topic");
+    GenericRow row = decoder.decode(_encodedRecord, _destination);
+    Assert.assertEquals(row.getValue("id"), 42);
+    Assert.assertEquals(row.getValue("name"), "alice");
+  }
+
+  @Test
+  public void testDecodeWithLeadingBytesStripped()
+      throws Exception {
+    byte[] header = new byte[]{1, 2, 3, 4};
+    byte[] payloadWithHeader = new byte[header.length + _encodedRecord.length];
+    System.arraycopy(header, 0, payloadWithHeader, 0, header.length);
+    System.arraycopy(_encodedRecord, 0, payloadWithHeader, header.length, 
_encodedRecord.length);
+
+    Map<String, String> props = baseProps();
+    props.put("leading.bytes.to.strip", String.valueOf(header.length));
+
+    SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+    decoder.init(props, Set.of(), "topic");
+    GenericRow row = decoder.decode(payloadWithHeader, _destination);
+    Assert.assertEquals(row.getValue("id"), 42);
+    Assert.assertEquals(row.getValue("name"), "alice");
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testNegativeLeadingBytesRejected()
+      throws Exception {
+    Map<String, String> props = baseProps();
+    props.put("leading.bytes.to.strip", "-1");
+    SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+    decoder.init(props, Set.of(), "topic");
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testLeadingBytesExceedsLength()
+      throws Exception {
+    Map<String, String> props = baseProps();
+    props.put("leading.bytes.to.strip", "10");
+    SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+    decoder.init(props, Set.of(), "topic");
+    decoder.decode(new byte[]{0, 1, 2, 3, 4}, new GenericRow());
+  }
+
+  @Test
+  public void testDecodeWithOffsetAndLength()
+      throws Exception {
+    byte[] prefix = new byte[]{9, 9};
+    byte[] suffix = new byte[]{8, 8, 8};
+    byte[] wrapped = new byte[prefix.length + _encodedRecord.length + 
suffix.length];
+    System.arraycopy(prefix, 0, wrapped, 0, prefix.length);
+    System.arraycopy(_encodedRecord, 0, wrapped, prefix.length, 
_encodedRecord.length);
+    System.arraycopy(suffix, 0, wrapped, prefix.length + 
_encodedRecord.length, suffix.length);
+
+    SimpleAvroMessageDecoder decoder = new SimpleAvroMessageDecoder();
+    decoder.init(baseProps(), Set.of(), "topic");
+    GenericRow row = decoder.decode(wrapped, prefix.length, 
_encodedRecord.length, _destination);
+    Assert.assertEquals(row.getValue("id"), 42);
+    Assert.assertEquals(row.getValue("name"), "alice");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to