This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e7f3a63a04a Add Apache Arrow format decoder to Pinot (#17031)
e7f3a63a04a is described below

commit e7f3a63a04ab0ecdcc5e617c25c5f4b55c822f66
Author: lnbest0707 <[email protected]>
AuthorDate: Tue Oct 21 09:52:09 2025 -0700

    Add Apache Arrow format decoder to Pinot (#17031)
    
    * Add Apache Arrow format decoder to Pinot
    
    * Resolve comments
---
 .../pinot-input-format/{ => pinot-arrow}/pom.xml   |  62 +-
 .../inputformat/arrow/ArrowMessageDecoder.java     | 114 +++
 .../arrow/ArrowToGenericRowConverter.java          | 226 ++++++
 .../inputformat/arrow/ArrowMessageDecoderTest.java | 762 +++++++++++++++++++++
 .../inputformat/arrow/util/ArrowTestDataUtil.java  | 607 ++++++++++++++++
 pinot-plugins/pinot-input-format/pom.xml           |   1 +
 6 files changed, 1747 insertions(+), 25 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pom.xml 
b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
similarity index 53%
copy from pinot-plugins/pinot-input-format/pom.xml
copy to pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
index e9aabe0079f..746c69b49b8 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-arrow/pom.xml
@@ -22,41 +22,53 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot-plugins</artifactId>
+    <artifactId>pinot-input-format</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>1.5.0-SNAPSHOT</version>
   </parent>
 
-  <artifactId>pinot-input-format</artifactId>
-  <packaging>pom</packaging>
-  <name>Pinot Input Format</name>
+  <artifactId>pinot-arrow</artifactId>
+  <name>Pinot Arrow</name>
   <url>https://pinot.apache.org/</url>
   <properties>
-    <pinot.root>${basedir}/../..</pinot.root>
-    <plugin.type>pinot-input-format</plugin.type>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <shade.phase.prop>package</shade.phase.prop>
   </properties>
 
-  <modules>
-    <module>pinot-avro</module>
-    <module>pinot-avro-base</module>
-    <module>pinot-clp-log</module>
-    <module>pinot-confluent-avro</module>
-    <module>pinot-confluent-json</module>
-    <module>pinot-confluent-protobuf</module>
-    <module>pinot-orc</module>
-    <module>pinot-json</module>
-    <module>pinot-parquet</module>
-    <module>pinot-csv</module>
-    <module>pinot-thrift</module>
-    <module>pinot-protobuf</module>
-  </modules>
-
   <dependencies>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-spi</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-compression</artifactId>
+      <version>${arrow.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-format</artifactId>
+      <version>${arrow.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-core</artifactId>
+      <version>${arrow.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-netty</artifactId>
+      <version>${arrow.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${arrow.version}</version>
     </dependency>
   </dependencies>
+
+  <profiles>
+    <profile>
+      <id>pinot-fastdev</id>
+      <properties>
+        <shade.phase.prop>none</shade.phase.prop>
+      </properties>
+    </profile>
+  </profiles>
 </project>
diff --git 
a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
new file mode 100644
index 00000000000..297e0463829
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+
+import java.io.ByteArrayInputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ArrowMessageDecoder is used to decode Apache Arrow IPC format messages into 
Pinot GenericRow.
+ * This decoder handles Arrow streaming format and converts Arrow data to 
Pinot's columnar format.
+ */
+public class ArrowMessageDecoder implements StreamMessageDecoder<byte[]> {
+  public static final String ARROW_ALLOCATOR_LIMIT = "arrow.allocator.limit";
+  public static final String DEFAULT_ALLOCATOR_LIMIT = "268435456"; // 256MB 
default
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ArrowMessageDecoder.class);
+
+  private String _streamTopicName;
+  private RootAllocator _allocator;
+  private ArrowToGenericRowConverter _converter;
+
+  @Override
+  public void init(Map<String, String> props, Set<String> fieldsToRead, String 
topicName)
+      throws Exception {
+    _streamTopicName = topicName;
+
+    // Initialize Arrow allocator with configurable memory limit
+    long allocatorLimit =
+        Long.parseLong(props.getOrDefault(ARROW_ALLOCATOR_LIMIT, 
DEFAULT_ALLOCATOR_LIMIT));
+    _allocator = new RootAllocator(allocatorLimit);
+
+    // Initialize Arrow to GenericRow converter (processes all fields)
+    _converter = new ArrowToGenericRowConverter();
+
+    logger.info(
+        "Initialized ArrowMessageDecoder for topic: {} with allocator limit: 
{} bytes",
+        topicName,
+        allocatorLimit);
+  }
+
+  @Nullable
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    try (ByteArrayInputStream inputStream = new ByteArrayInputStream(payload);
+        ReadableByteChannel channel = Channels.newChannel(inputStream);
+        ArrowStreamReader reader = new ArrowStreamReader(channel, _allocator)) 
{
+
+      // Read the Arrow schema and data
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      if (!reader.loadNextBatch()) {
+        logger.warn("No data found in Arrow message for topic: {}", 
_streamTopicName);
+        return null;
+      }
+
+      // Convert Arrow data to GenericRow using converter
+      GenericRow row = _converter.convert(reader, root, destination);
+
+      return row;
+    } catch (Exception e) {
+      logger.error(
+          "Error decoding Arrow message for stream topic {} : {}",
+          _streamTopicName,
+          Arrays.toString(payload),
+          e);
+      return null;
+    }
+  }
+
+  @Nullable
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow 
destination) {
+    return decode(Arrays.copyOfRange(payload, offset, offset + length), 
destination);
+  }
+
+  /** Clean up resources */
+  public void close() {
+    if (_allocator != null) {
+      try {
+        _allocator.close();
+      } catch (Exception e) {
+        logger.warn("Error closing Arrow allocator", e);
+      }
+    }
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
new file mode 100644
index 00000000000..2cdafc49478
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.util.Text;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for converting Apache Arrow VectorSchemaRoot to Pinot {@code 
GenericRow}. Processes
+ * all fields and handles multiple rows from Arrow batch.
+ */
+public class ArrowToGenericRowConverter {
+  private static final Logger logger = 
LoggerFactory.getLogger(ArrowToGenericRowConverter.class);
+
+  /** Default constructor that processes all fields from Arrow batch. */
+  public ArrowToGenericRowConverter() {
+    logger.debug("ArrowToGenericRowConverter created for processing all 
fields");
+  }
+
+  /**
+   * Converts an Arrow VectorSchemaRoot to a Pinot {@code GenericRow}. 
Processes ALL rows from the
+   * Arrow batch and stores them as a list using MULTIPLE_RECORDS_KEY.
+   *
+   * @param reader ArrowStreamReader containing the data
+   * @param root Arrow VectorSchemaRoot containing the data
+   * @param destination Optional destination {@code GenericRow}, will create 
new if null
+   * @return {@code GenericRow} containing {@code List<GenericRow>} with all 
converted rows, or null
+   *     if no data available
+   */
+  @Nullable
+  public GenericRow convert(
+      ArrowStreamReader reader, VectorSchemaRoot root, GenericRow destination) 
{
+    if (root == null) {
+      logger.warn("Cannot convert null VectorSchemaRoot");
+      return null;
+    }
+
+    if (destination == null) {
+      destination = new GenericRow();
+    }
+
+    int rowCount = root.getRowCount();
+    if (rowCount == 0) {
+      logger.warn("No rows found in Arrow data");
+      return destination;
+    }
+
+    List<GenericRow> rows = new ArrayList<>(rowCount);
+
+    // Process all rows from the Arrow batch
+    for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+      GenericRow row = convertSingleRow(reader, root, rowIndex);
+      if (row != null) {
+        rows.add(row);
+      }
+    }
+
+    if (!rows.isEmpty()) {
+      // Use Pinot's MULTIPLE_RECORDS_KEY to store the list of rows
+      destination.putValue(GenericRow.MULTIPLE_RECORDS_KEY, rows);
+      logger.debug("Converted {} rows from Arrow batch", rows.size());
+    }
+
+    return destination;
+  }
+
+  /**
+   * Converts a single row from Arrow VectorSchemaRoot.
+   *
+   * @param reader ArrowStreamReader containing the data
+   * @param root Arrow VectorSchemaRoot containing the data
+   * @param rowIndex Index of the row to convert (0-based)
+   * @return {@code GenericRow} with converted data, or null if row index is 
invalid
+   */
+  @Nullable
+  private GenericRow convertSingleRow(
+      ArrowStreamReader reader, VectorSchemaRoot root, int rowIndex) {
+    GenericRow row = new GenericRow();
+    int convertedFields = 0;
+
+    // Process all fields in the Arrow schema
+    for (int i = 0; i < root.getFieldVectors().size(); i++) {
+      Object value;
+
+      FieldVector fieldVector = root.getFieldVectors().get(i);
+      String fieldName = fieldVector.getField().getName();
+      try {
+        if (fieldVector.getField().getDictionary() != null) {
+          long dictionaryId = fieldVector.getField().getDictionary().getId();
+          try (ValueVector realFieldVector =
+              DictionaryEncoder.decode(
+                  fieldVector, 
reader.getDictionaryVectors().get(dictionaryId))) {
+            value = realFieldVector.getObject(rowIndex);
+          }
+        } else {
+          value = fieldVector.getObject(rowIndex);
+        }
+        if (value != null) {
+          // Convert Arrow-specific types to Pinot-compatible types
+          Object pinotCompatibleValue = 
convertArrowTypeToPinotCompatible(value);
+          row.putValue(fieldName, pinotCompatibleValue);
+          convertedFields++;
+        }
+      } catch (Exception e) {
+        logger.error("Error extracting value for field: {} at row {}", 
fieldName, rowIndex, e);
+      }
+    }
+
+    logger.debug("Converted {} fields from Arrow row {} to GenericRow", 
convertedFields, rowIndex);
+    return row;
+  }
+
+  /**
+   * Converts Arrow-specific data types to Pinot-compatible types. This method 
handles the
+   * incompatibility issues between Arrow's native data types and what Pinot 
expects.
+   *
+   * @param value The raw value from Arrow fieldVector.getObject()
+   * @return A Pinot-compatible version of the value
+   */
+  @Nullable
+  private Object convertArrowTypeToPinotCompatible(@Nullable Object value) {
+    if (value == null) {
+      return null;
+    }
+
+    // Handle nested List and Map values, including Arrow MapVector's 
representation
+    if (value instanceof List) {
+      List<?> originalList = (List<?>) value;
+      if (!originalList.isEmpty()) {
+        boolean looksLikeMapEntries = true;
+        boolean sawNonNull = false;
+        for (Object entryObj : originalList) {
+          if (entryObj == null) {
+            continue;
+          }
+          sawNonNull = true;
+          if (!(entryObj instanceof Map)) {
+            looksLikeMapEntries = false;
+            break;
+          }
+          @SuppressWarnings("unchecked")
+          Map<Object, Object> entryMap = (Map<Object, Object>) entryObj;
+          if (!entryMap.containsKey(MapVector.KEY_NAME)) {
+            looksLikeMapEntries = false;
+            break;
+          }
+        }
+        if (looksLikeMapEntries && sawNonNull) {
+          Map<String, Object> flattened = new 
LinkedHashMap<>(originalList.size());
+          for (Object entryObj : originalList) {
+            if (entryObj == null) {
+              continue;
+            }
+            @SuppressWarnings("unchecked")
+            Map<Object, Object> entryMap = (Map<Object, Object>) entryObj;
+            Object rawKey = entryMap.get(MapVector.KEY_NAME);
+            Object rawVal = entryMap.get(MapVector.VALUE_NAME);
+            Object convertedKey = convertArrowTypeToPinotCompatible(rawKey);
+            Object convertedVal = convertArrowTypeToPinotCompatible(rawVal);
+            flattened.put(String.valueOf(convertedKey), convertedVal);
+          }
+          return flattened;
+        }
+      }
+
+      List<Object> convertedList = new ArrayList<>(originalList.size());
+      for (Object element : originalList) {
+        convertedList.add(convertArrowTypeToPinotCompatible(element));
+      }
+      return convertedList;
+    }
+
+    // Handle Arrow Text type -> String conversion
+    if (value instanceof Text) {
+      // Arrow VarCharVector.getObject() returns Text objects, but Pinot 
expects String
+      return value.toString();
+    }
+
+    // Handle Arrow LocalDateTime -> java.sql.Timestamp conversion
+    if (value instanceof LocalDateTime) {
+      // Arrow TimeStampMilliVector.getObject() returns LocalDateTime, but 
Pinot expects
+      // java.sql.Timestamp objects for proper timestamp handling and native 
support
+      LocalDateTime dateTime = (LocalDateTime) value;
+      return Timestamp.from(dateTime.toInstant(ZoneOffset.UTC));
+    }
+
+    // Handle other potential Arrow-specific types that might cause issues
+
+    // For primitive types (Integer, Double, Boolean) and other Java standard 
types,
+    // Arrow returns standard Java objects that are already Pinot-compatible
+    return value;
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
new file mode 100644
index 00000000000..a6b8adc9ca9
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow;
+
+import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.plugin.inputformat.arrow.util.ArrowTestDataUtil;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ArrowMessageDecoderTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ArrowMessageDecoderTest.class);
+
+  @Test
+  public void testArrowMessageDecoderWithDifferentAllocatorLimits()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    // Test with custom allocator limit
+    Map<String, String> props = new HashMap<>();
+    props.put(ArrowMessageDecoder.ARROW_ALLOCATOR_LIMIT, "67108864"); // 64MB
+
+    Set<String> fieldsToRead = Sets.newHashSet("field1");
+    String topicName = "test-topic-custom";
+
+    decoder.init(props, fieldsToRead, topicName);
+    decoder.close();
+
+    // Test with default allocator limit
+    ArrowMessageDecoder decoder2 = new ArrowMessageDecoder();
+    Map<String, String> props2 = new HashMap<>(); // No allocator limit set
+
+    decoder2.init(props2, fieldsToRead, topicName);
+    decoder2.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderMultipleInits()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id");
+    String topicName = "test-multiple-init";
+
+    // Test multiple initializations (should work without issues)
+    decoder.init(props, fieldsToRead, topicName);
+    decoder.init(props, fieldsToRead, topicName);
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecodingWithInvalidData()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "name", "age");
+    String topicName = "test-arrow-topic";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Test various invalid data scenarios
+    byte[] invalidData1 = "invalid arrow data".getBytes();
+    byte[] invalidData2 = new byte[]{1, 2, 3, 4, 5};
+    byte[] emptyData = new byte[0];
+
+    GenericRow destination = new GenericRow();
+
+    // Should return null for all invalid data types and null
+    assertNull(decoder.decode(null, destination));
+    assertNull(decoder.decode(invalidData1, destination));
+    assertNull(decoder.decode(invalidData2, destination));
+    assertNull(decoder.decode(emptyData, destination));
+
+    // Test with null destination
+    assertNull(decoder.decode(invalidData1, null));
+
+    // Clean up
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderCloseMultipleTimes()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id");
+    String topicName = "test-multiple-close";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Close multiple times should not cause issues
+    decoder.close();
+    decoder.close();
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithArrowDataAndDestination()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "name");
+    String topicName = "test-real-arrow-with-destination";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create real Arrow IPC data
+    byte[] realArrowData = ArrowTestDataUtil.createValidArrowIpcData(1);
+
+    // Test with provided destination containing existing data
+    GenericRow destination = new GenericRow();
+    destination.putValue("existing_field", "existing_value");
+
+    GenericRow result = decoder.decode(realArrowData, destination);
+
+    // Should return the same destination object (testing 
ArrowToGenericRowConverter destination
+    // handling)
+    assertSame(destination, result);
+
+    // Should preserve existing data
+    assertEquals("existing_value", result.getValue("existing_field"));
+
+    // Should contain new converted Arrow data
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(1, rows.size());
+    assertEquals(1, rows.get(0).getValue("id"));
+    assertEquals("name_1", rows.get(0).getValue("name"));
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithEmptyData()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "name");
+    String topicName = "test-empty-arrow-data";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Test with empty Arrow data (zero batches)
+    byte[] emptyArrowData = ArrowTestDataUtil.createEmptyArrowIpcData();
+    GenericRow result = decoder.decode(emptyArrowData, null);
+
+    // Should handle empty data gracefully - might return null or empty result
+    // This tests the edge case of zero batches
+    if (result != null) {
+      @SuppressWarnings("unchecked")
+      List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+      if (rows != null) {
+        assertEquals(0, rows.size());
+      }
+    }
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithMultipleDataTypes()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "name", "price", 
"active", "timestamp");
+    String topicName = "test-multi-type-arrow-data";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with multiple data types
+    byte[] multiTypeArrowData = 
ArrowTestDataUtil.createMultiTypeArrowIpcData(3);
+    GenericRow result = decoder.decode(multiTypeArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(3, rows.size());
+
+    // Verify different data types are correctly handled
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    assertEquals("product_1", row0.getValue("name").toString());
+    assertEquals(10.99, (Double) row0.getValue("price"), 0.01);
+    assertEquals(true, row0.getValue("active")); // BitVector returns boolean
+    assertNotNull(row0.getValue("timestamp")); // Timestamp should be present
+
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    assertEquals("product_2", row1.getValue("name").toString());
+    assertEquals(15.99, (Double) row1.getValue("price"), 0.01);
+    assertEquals(false, row1.getValue("active"));
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithBatchContainingMultipleRows()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "batch_num", "value");
+    String topicName = "test-multi-batch-arrow-data";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with multiple batches - but note: ArrowMessageDecoder 
processes one batch
+    // per decode() call
+    // So we test with a single batch containing multiple rows instead
+    byte[] multiBatchArrowData =
+        ArrowTestDataUtil.createMultiBatchArrowIpcData(1, 3); // 1 batch, 3 
rows
+    GenericRow result = decoder.decode(multiBatchArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(3, rows.size()); // 1 batch × 3 rows = 3 total rows
+
+    // Verify data from the batch
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    assertEquals(0, row0.getValue("batch_num"));
+    assertEquals("batch_0_row_0", row0.getValue("value").toString());
+
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    assertEquals(0, row1.getValue("batch_num"));
+    assertEquals("batch_0_row_1", row1.getValue("value").toString());
+
+    GenericRow row2 = rows.get(2);
+    assertEquals(3, row2.getValue("id"));
+    assertEquals(0, row2.getValue("batch_num"));
+    assertEquals("batch_0_row_2", row2.getValue("value").toString());
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithDictionaryEncodedData()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "category", "price");
+    String topicName = "test-dictionary-encoded-arrow-data";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with real dictionary encoding
+    byte[] dictionaryArrowData = 
ArrowTestDataUtil.createDictionaryEncodedArrowIpcData(8);
+    GenericRow result = decoder.decode(dictionaryArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(8, rows.size());
+
+    // Verify dictionary-encoded values are properly decoded by 
ArrowToGenericRowConverter
+    // Dictionary: id=1 -> "Electronics", id=2 -> "Books", id=3 -> "Clothing", 
id=4 -> "Home"
+    // Data cycles through indices 0,1,2,3,0,1,2,3 which should be resolved to 
string values
+
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    assertEquals("Electronics", row0.getValue("category"));
+    assertEquals(19.99, (Double) row0.getValue("price"), 0.01);
+
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    assertEquals("Books", row1.getValue("category"));
+    assertEquals(29.99, (Double) row1.getValue("price"), 0.01);
+
+    GenericRow row2 = rows.get(2);
+    assertEquals(3, row2.getValue("id"));
+    assertEquals("Clothing", row2.getValue("category"));
+    assertEquals(39.99, (Double) row2.getValue("price"), 0.01);
+
+    GenericRow row3 = rows.get(3);
+    assertEquals(4, row3.getValue("id"));
+    assertEquals("Home", row3.getValue("category"));
+    assertEquals(49.99, (Double) row3.getValue("price"), 0.01);
+
+    // Verify cycling continues - row 4 should have same category as row 0
+    GenericRow row4 = rows.get(4);
+    assertEquals(5, row4.getValue("id"));
+    assertEquals("Electronics", row4.getValue("category"));
+    assertEquals(59.99, (Double) row4.getValue("price"), 0.01);
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowDataTypeCompatibility()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "name", "price", 
"active", "timestamp");
+    String topicName = "test-data-type-compatibility";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with multiple data types to verify compatibility
+    byte[] multiTypeArrowData = 
ArrowTestDataUtil.createMultiTypeArrowIpcData(3);
+    GenericRow result = decoder.decode(multiTypeArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(3, rows.size());
+
+    // Check the actual data types returned by Arrow and verify Pinot 
compatibility
+    GenericRow row0 = rows.get(0);
+
+    // Verify each field's type and compatibility
+    Object idValue = row0.getValue("id");
+    assertNotNull(idValue, "ID should not be null");
+    assertTrue(idValue instanceof Integer, "ID should be Integer compatible");
+
+    Object nameValue = row0.getValue("name");
+    assertNotNull(nameValue, "Name should not be null");
+    // After conversion, Arrow Text should be converted to String for Pinot 
compatibility
+    assertTrue(nameValue instanceof String, "Name should be String after 
conversion");
+    assertEquals("product_1", nameValue);
+    LOGGER.info("Arrow name field successfully converted to String: {}", 
nameValue);
+
+    Object priceValue = row0.getValue("price");
+    assertNotNull(priceValue, "Price should not be null");
+    assertTrue(priceValue instanceof Double, "Price should be Double 
compatible");
+
+    Object activeValue = row0.getValue("active");
+    assertNotNull(activeValue, "Active should not be null");
+    // BitVector.getObject() returns Boolean
+    assertTrue(activeValue instanceof Boolean, "Active should be Boolean 
compatible");
+
+    Object timestampValue = row0.getValue("timestamp");
+    assertNotNull(timestampValue, "Timestamp should not be null");
+    // After conversion, Arrow LocalDateTime should be converted to 
java.sql.Timestamp for Pinot
+    // compatibility
+    assertTrue(
+        timestampValue instanceof java.sql.Timestamp,
+        "Timestamp should be java.sql.Timestamp after conversion");
+    java.sql.Timestamp ts = (java.sql.Timestamp) timestampValue;
+    assertTrue(ts.getTime() > 0, "Timestamp should be a positive value");
+    LOGGER.info(
+        "Arrow timestamp field successfully converted to java.sql.Timestamp: 
{}", timestampValue);
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithListVectors()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "numbers", "tags");
+    String topicName = "test-list-vectors";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with List vectors
+    byte[] listArrowData = ArrowTestDataUtil.createListArrowIpcData(3);
+    GenericRow result = decoder.decode(listArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(3, rows.size());
+
+    // Verify first row - should have 1 number and 2 tags
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    Object numbersValue0 = row0.getValue("numbers");
+    assertNotNull(numbersValue0, "Numbers should not be null");
+    assertTrue(numbersValue0 instanceof List);
+    @SuppressWarnings("unchecked")
+    List<Object> numbersList0 = (List<Object>) numbersValue0;
+    assertEquals(1, numbersList0.size());
+    assertEquals(10, numbersList0.get(0));
+
+    Object tagsValue0 = row0.getValue("tags");
+    assertNotNull(tagsValue0, "Tags should not be null");
+    assertTrue(tagsValue0 instanceof List);
+    @SuppressWarnings("unchecked")
+    List<Object> tagsList0 = (List<Object>) tagsValue0;
+    assertEquals(2, tagsList0.size());
+    assertEquals("tag_0_0", tagsList0.get(0).toString());
+    assertEquals("tag_0_1", tagsList0.get(1).toString());
+
+    // Verify second row - should have 2 numbers and 2 tags
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    Object numbersValue1 = row1.getValue("numbers");
+    assertNotNull(numbersValue1);
+    @SuppressWarnings("unchecked")
+    List<Object> numbersList1 = (List<Object>) numbersValue1;
+    assertEquals(2, numbersList1.size());
+    assertEquals(20, numbersList1.get(0));
+    assertEquals(21, numbersList1.get(1));
+
+    // Verify third row - should have 3 numbers
+    GenericRow row2 = rows.get(2);
+    assertEquals(3, row2.getValue("id"));
+    Object numbersValue2 = row2.getValue("numbers");
+    @SuppressWarnings("unchecked")
+    List<Object> numbersList2 = (List<Object>) numbersValue2;
+    assertEquals(3, numbersList2.size());
+    assertEquals(30, numbersList2.get(0));
+    assertEquals(31, numbersList2.get(1));
+    assertEquals(32, numbersList2.get(2));
+
+    LOGGER.info("List vector test completed successfully with {} rows", 
rows.size());
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithStructVectors()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "person");
+    String topicName = "test-struct-vectors";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with Struct vectors
+    byte[] structArrowData = ArrowTestDataUtil.createStructArrowIpcData(2);
+    GenericRow result = decoder.decode(structArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(2, rows.size());
+
+    // Verify first row with nested struct
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    Object personValue0 = row0.getValue("person");
+    assertNotNull(personValue0);
+    assertTrue(personValue0 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> personMap0 = (Map<String, Object>) personValue0;
+    assertEquals("Person_1", personMap0.get("name").toString());
+    assertEquals(25, personMap0.get("age"));
+    @SuppressWarnings("unchecked")
+    Map<String, Object> address0 = (Map<String, Object>) 
personMap0.get("address");
+    assertEquals("1 Main St", address0.get("street").toString());
+    assertEquals("City_1", address0.get("city").toString());
+
+    // Verify second row
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    Object personValue1 = row1.getValue("person");
+    assertNotNull(personValue1);
+    assertTrue(personValue1 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> personMap1 = (Map<String, Object>) personValue1;
+    assertEquals("Person_2", personMap1.get("name").toString());
+    assertEquals(26, personMap1.get("age"));
+    @SuppressWarnings("unchecked")
+    Map<String, Object> address1 = (Map<String, Object>) 
personMap1.get("address");
+    assertEquals("2 Main St", address1.get("street").toString());
+    assertEquals("City_2", address1.get("city").toString());
+
+    LOGGER.info("Struct vector test completed successfully with {} rows", 
rows.size());
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithMapVectors()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "metadata");
+    String topicName = "test-map-vectors";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with Map vectors
+    byte[] mapArrowData = ArrowTestDataUtil.createMapArrowIpcData(2);
+    GenericRow result = decoder.decode(mapArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(2, rows.size());
+
+    // Verify first row with map data
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    Object metadataValue0 = row0.getValue("metadata");
+    assertNotNull(metadataValue0);
+    assertTrue(metadataValue0 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> meta0 = (Map<String, Object>) metadataValue0;
+    assertTrue(meta0.values().contains(100));
+    assertTrue(meta0.values().contains(101));
+
+    // Verify second row - should have 3 entries (2 + (1%2) = 3)
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    Object metadataValue1 = row1.getValue("metadata");
+    assertNotNull(metadataValue1);
+    assertTrue(metadataValue1 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> meta1 = (Map<String, Object>) metadataValue1;
+    assertTrue(meta1.values().contains(200));
+    assertTrue(meta1.values().contains(201));
+    assertTrue(meta1.values().contains(202));
+
+    LOGGER.info("Map vector test completed successfully with {} rows", 
rows.size());
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithNestedMapValues()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "metadata");
+    String topicName = "test-nested-map-values";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with Map values that are themselves Maps
+    byte[] nestedMapArrowData = 
ArrowTestDataUtil.createNestedMapArrowIpcData(2);
+    GenericRow result = decoder.decode(nestedMapArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(2, rows.size());
+
+    // Verify first row: metadata is a Map<String, Map<String, Integer>>
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    Object metadataValue0 = row0.getValue("metadata");
+    assertNotNull(metadataValue0);
+    assertTrue(metadataValue0 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> outer0 = (Map<String, Object>) metadataValue0;
+    assertTrue(outer0.size() >= 2);
+    for (Object innerMapObj : outer0.values()) {
+      assertTrue(innerMapObj instanceof Map);
+      @SuppressWarnings("unchecked")
+      Map<String, Object> inner = (Map<String, Object>) innerMapObj;
+      assertTrue(inner.size() >= 2);
+      // Values should be integers from generator
+      for (Object v : inner.values()) {
+        assertTrue(v instanceof Integer);
+      }
+    }
+
+    // Verify second row similarly
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    Object metadataValue1 = row1.getValue("metadata");
+    assertNotNull(metadataValue1);
+    assertTrue(metadataValue1 instanceof Map);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> outer1 = (Map<String, Object>) metadataValue1;
+    assertTrue(outer1.size() >= 2);
+    boolean sawThreeInner = false;
+    for (Object innerMapObj : outer1.values()) {
+      assertTrue(innerMapObj instanceof Map);
+      @SuppressWarnings("unchecked")
+      Map<String, Object> inner = (Map<String, Object>) innerMapObj;
+      if (inner.size() == 3) {
+        sawThreeInner = true;
+      }
+    }
+    assertTrue(sawThreeInner);
+
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowMessageDecoderWithNestedListStruct()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "items");
+    String topicName = "test-nested-list-struct";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Create Arrow data with nested List of Structs
+    byte[] nestedArrowData = 
ArrowTestDataUtil.createNestedListStructArrowIpcData(3);
+    GenericRow result = decoder.decode(nestedArrowData, null);
+
+    assertNotNull(result);
+    @SuppressWarnings("unchecked")
+    List<GenericRow> rows = (List<GenericRow>) 
result.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(rows);
+    assertEquals(3, rows.size());
+
+    // Verify first row - should have 1 item (1 + (0%3) = 1)
+    GenericRow row0 = rows.get(0);
+    assertEquals(1, row0.getValue("id"));
+    Object itemsValue0 = row0.getValue("items");
+    assertNotNull(itemsValue0);
+    assertTrue(itemsValue0 instanceof List);
+    @SuppressWarnings("unchecked")
+    List<Object> items0 = (List<Object>) itemsValue0;
+    assertEquals(1, items0.size());
+    @SuppressWarnings("unchecked")
+    Map<String, Object> item00 = (Map<String, Object>) items0.get(0);
+    assertEquals("item_0_0", item00.get("item_name").toString());
+    assertEquals(10.99, (Double) item00.get("item_price"), 0.01);
+
+    // Verify second row - should have 2 items (1 + (1%3) = 2)
+    GenericRow row1 = rows.get(1);
+    assertEquals(2, row1.getValue("id"));
+    Object itemsValue1 = row1.getValue("items");
+    assertNotNull(itemsValue1);
+    @SuppressWarnings("unchecked")
+    List<Object> items1 = (List<Object>) itemsValue1;
+    assertEquals(2, items1.size());
+    @SuppressWarnings("unchecked")
+    Map<String, Object> item10 = (Map<String, Object>) items1.get(0);
+    assertEquals("item_1_0", item10.get("item_name").toString());
+    assertEquals(15.99, (Double) item10.get("item_price"), 0.01);
+    @SuppressWarnings("unchecked")
+    Map<String, Object> item11 = (Map<String, Object>) items1.get(1);
+    assertEquals("item_1_1", item11.get("item_name").toString());
+    assertEquals(16.99, (Double) item11.get("item_price"), 0.01);
+
+    // Verify third row - should have 3 items (1 + (2%3) = 3)
+    GenericRow row2 = rows.get(2);
+    assertEquals(3, row2.getValue("id"));
+    Object itemsValue2 = row2.getValue("items");
+    assertNotNull(itemsValue2);
+    @SuppressWarnings("unchecked")
+    List<Object> items2 = (List<Object>) itemsValue2;
+    assertEquals(3, items2.size());
+
+    LOGGER.info("Nested List-Struct test completed successfully with {} rows", 
rows.size());
+    decoder.close();
+  }
+
+  @Test
+  public void testArrowNestedStructureCompatibilityWithPinot()
+      throws Exception {
+    ArrowMessageDecoder decoder = new ArrowMessageDecoder();
+
+    Map<String, String> props = new HashMap<>();
+    Set<String> fieldsToRead = Sets.newHashSet("id", "numbers", "person", 
"metadata", "items");
+    String topicName = "test-nested-compatibility";
+
+    decoder.init(props, fieldsToRead, topicName);
+
+    // Test each nested structure type individually for compatibility
+    // Test List compatibility
+    byte[] listData = ArrowTestDataUtil.createListArrowIpcData(1);
+    GenericRow listResult = decoder.decode(listData, null);
+    assertNotNull(listResult, "List data should be decodable");
+
+    // Test Struct compatibility
+    byte[] structData = ArrowTestDataUtil.createStructArrowIpcData(1);
+    GenericRow structResult = decoder.decode(structData, null);
+    assertNotNull(structResult, "Struct data should be decodable");
+
+    // Test Map compatibility
+    byte[] mapData = ArrowTestDataUtil.createMapArrowIpcData(1);
+    GenericRow mapResult = decoder.decode(mapData, null);
+    assertNotNull(mapResult, "Map data should be decodable");
+
+    // Test complex nested structures
+    byte[] nestedData = 
ArrowTestDataUtil.createNestedListStructArrowIpcData(1);
+    GenericRow nestedResult = decoder.decode(nestedData, null);
+    assertNotNull(nestedResult, "Nested List-Struct data should be decodable");
+
+    // Verify that all simulated nested structures produce valid GenericRow 
objects
+    @SuppressWarnings("unchecked")
+    List<GenericRow> listRows =
+        (List<GenericRow>) 
listResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(listRows, "List result should contain rows");
+    assertTrue(listRows.size() > 0, "List result should have at least one 
row");
+
+    // Verify nested list data is accessible
+    GenericRow firstListRow = listRows.get(0);
+    assertNotNull(firstListRow.getValue("numbers"), "List row should have 
numbers");
+
+    @SuppressWarnings("unchecked")
+    List<GenericRow> structRows =
+        (List<GenericRow>) 
structResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(structRows, "Struct result should contain rows");
+    assertTrue(structRows.size() > 0, "Struct result should have at least one 
row");
+
+    // Verify struct data is accessible
+    GenericRow firstStructRow = structRows.get(0);
+    assertNotNull(firstStructRow.getValue("person"), "Struct row should have 
person");
+
+    @SuppressWarnings("unchecked")
+    List<GenericRow> mapRows =
+        (List<GenericRow>) mapResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(mapRows, "Map result should contain rows");
+    assertTrue(mapRows.size() > 0, "Map result should have at least one row");
+
+    // Verify map data is accessible
+    GenericRow firstMapRow = mapRows.get(0);
+    assertNotNull(firstMapRow.getValue("metadata"), "Map row should have 
metadata");
+
+    @SuppressWarnings("unchecked")
+    List<GenericRow> nestedRows =
+        (List<GenericRow>) 
nestedResult.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+    assertNotNull(nestedRows, "Nested result should contain rows");
+    assertTrue(nestedRows.size() > 0, "Nested result should have at least one 
row");
+
+    // Verify nested list-struct data is accessible
+    GenericRow firstNestedRow = nestedRows.get(0);
+    assertNotNull(firstNestedRow.getValue("items"), "Nested row should have 
items");
+
+    LOGGER.info(
+        "All nested structure types are compatible with ArrowMessageDecoder 
and produce valid GenericRow objects");
+    decoder.close();
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
new file mode 100644
index 00000000000..206dc7d85a5
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.arrow.util;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryEncoder;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+
+public class ArrowTestDataUtil {
+
+  private ArrowTestDataUtil() {
+  }
+
+  public static byte[] createValidArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Field nameField = new Field("name", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Schema schema = new Schema(Arrays.asList(idField, nameField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        VarCharVector nameVector = (VarCharVector) root.getVector("name");
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        nameVector.allocateNew(numRows * 10, numRows);
+
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+          nameVector.set(i, ("name_" + (i + 1)).getBytes());
+        }
+
+        idVector.setValueCount(numRows);
+        nameVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createMultiTypeArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Field nameField = new Field("name", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field priceField =
+          new Field(
+              "price",
+              FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+              null);
+      Field activeField = new Field("active", FieldType.nullable(new 
ArrowType.Bool()), null);
+      Field timestampField =
+          new Field(
+              "timestamp",
+              FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, 
null)),
+              null);
+
+      Schema schema =
+          new Schema(Arrays.asList(idField, nameField, priceField, 
activeField, timestampField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        VarCharVector nameVector = (VarCharVector) root.getVector("name");
+        Float8Vector priceVector = (Float8Vector) root.getVector("price");
+        BitVector activeVector = (BitVector) root.getVector("active");
+        TimeStampMilliVector timestampVector = (TimeStampMilliVector) 
root.getVector("timestamp");
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        nameVector.allocateNew(numRows * 20, numRows);
+        priceVector.allocateNew(numRows);
+        activeVector.allocateNew(numRows);
+        timestampVector.allocateNew(numRows);
+
+        long baseTime = System.currentTimeMillis();
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+          nameVector.set(i, ("product_" + (i + 1)).getBytes());
+          priceVector.set(i, 10.99 + (i * 5.0));
+          activeVector.set(i, i % 2 == 0 ? 1 : 0);
+          timestampVector.set(i, baseTime + (i * 1000L));
+        }
+
+        idVector.setValueCount(numRows);
+        nameVector.setValueCount(numRows);
+        priceVector.setValueCount(numRows);
+        activeVector.setValueCount(numRows);
+        timestampVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createMultiBatchArrowIpcData(int batchCount, int 
rowsPerBatch)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Field batchField =
+          new Field("batch_num", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Field valueField = new Field("value", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Schema schema = new Schema(Arrays.asList(idField, batchField, 
valueField));
+
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      try (WritableByteChannel channel = Channels.newChannel(outputStream);
+          VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
+          ArrowStreamWriter writer = new ArrowStreamWriter(root, null, 
channel)) {
+
+        writer.start();
+
+        IntVector idVector = (IntVector) root.getVector("id");
+        IntVector batchVector = (IntVector) root.getVector("batch_num");
+        VarCharVector valueVector = (VarCharVector) root.getVector("value");
+
+        int totalRowId = 1;
+        for (int batch = 0; batch < batchCount; batch++) {
+          root.allocateNew();
+          idVector.allocateNew(rowsPerBatch);
+          batchVector.allocateNew(rowsPerBatch);
+          valueVector.allocateNew(rowsPerBatch * 15, rowsPerBatch);
+
+          for (int row = 0; row < rowsPerBatch; row++) {
+            idVector.set(row, totalRowId++);
+            batchVector.set(row, batch);
+            valueVector.set(row, ("batch_" + batch + "_row_" + 
row).getBytes());
+          }
+
+          idVector.setValueCount(rowsPerBatch);
+          batchVector.setValueCount(rowsPerBatch);
+          valueVector.setValueCount(rowsPerBatch);
+          root.setRowCount(rowsPerBatch);
+
+          writer.writeBatch();
+        }
+
+        writer.end();
+        return outputStream.toByteArray();
+      }
+    }
+  }
+
+  public static byte[] createEmptyArrowIpcData()
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Field nameField = new Field("name", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Schema schema = new Schema(Arrays.asList(idField, nameField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        root.setRowCount(0);
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        try (WritableByteChannel channel = Channels.newChannel(outputStream);
+            ArrowStreamWriter writer = new ArrowStreamWriter(root, null, 
channel)) {
+
+          writer.start();
+          writer.end();
+        }
+
+        return outputStream.toByteArray();
+      }
+    }
+  }
+
+  public static byte[] createDictionaryEncodedArrowIpcData(int numRows)
+      throws Exception {
+    List<String> dictionaryValues = Arrays.asList("Electronics", "Books", 
"Clothing", "Home");
+    DictionaryEncoding dictionaryEncoding =
+        new DictionaryEncoding(1L, false, new ArrowType.Int(32, true));
+
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+        VarCharVector dictionaryVector = new VarCharVector("category_dict", 
allocator);
+        IntVector idVector = new IntVector("id", allocator);
+        Float8Vector priceVector = new Float8Vector("price", allocator);
+        VarCharVector categoryUnencoded =
+            new VarCharVector(
+                "category",
+                new FieldType(true, new ArrowType.Utf8(), dictionaryEncoding),
+                allocator)) {
+
+      dictionaryVector.allocateNew();
+      for (int i = 0; i < dictionaryValues.size(); i++) {
+        dictionaryVector.set(i, dictionaryValues.get(i).getBytes());
+      }
+      dictionaryVector.setValueCount(dictionaryValues.size());
+
+      Dictionary dictionary = new Dictionary(dictionaryVector, 
dictionaryEncoding);
+      DictionaryProvider.MapDictionaryProvider dictionaryProvider =
+          new DictionaryProvider.MapDictionaryProvider();
+      dictionaryProvider.put(dictionary);
+
+      idVector.allocateNew(numRows);
+      priceVector.allocateNew(numRows);
+      categoryUnencoded.allocateNew(numRows);
+
+      for (int i = 0; i < numRows; i++) {
+        idVector.set(i, i + 1);
+        categoryUnencoded.set(i, dictionaryValues.get(i % 
dictionaryValues.size()).getBytes());
+        priceVector.set(i, 19.99 + (i * 10.0));
+      }
+      idVector.setValueCount(numRows);
+      priceVector.setValueCount(numRows);
+      categoryUnencoded.setValueCount(numRows);
+
+      try (org.apache.arrow.vector.FieldVector encodedCategoryVector =
+          (org.apache.arrow.vector.FieldVector)
+              DictionaryEncoder.encode(categoryUnencoded, dictionary)) {
+        List<Field> fields =
+            Arrays.asList(
+                idVector.getField(), encodedCategoryVector.getField(), 
priceVector.getField());
+        List<org.apache.arrow.vector.FieldVector> vectors =
+            Arrays.asList(idVector, encodedCategoryVector, priceVector);
+        try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {
+          return writeArrowDataToBytes(root, dictionaryProvider);
+        }
+      }
+    }
+  }
+
+  public static byte[] createListArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field numbersElementField =
+          new Field("$data$", FieldType.nullable(new ArrowType.Int(32, true)), 
null);
+      Field numbersField =
+          new Field(
+              "numbers",
+              FieldType.nullable(new ArrowType.List()),
+              Arrays.asList(numbersElementField));
+
+      Field tagsElementField = new Field("$data$", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field tagsField =
+          new Field(
+              "tags", FieldType.nullable(new ArrowType.List()), 
Arrays.asList(tagsElementField));
+
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Schema schema = new Schema(Arrays.asList(idField, numbersField, 
tagsField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        ListVector numbersVector = (ListVector) root.getVector("numbers");
+        ListVector tagsVector = (ListVector) root.getVector("tags");
+        IntVector numbersChild = (IntVector) numbersVector.getDataVector();
+        VarCharVector tagsChild = (VarCharVector) tagsVector.getDataVector();
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        numbersVector.allocateNew();
+        tagsVector.allocateNew();
+
+        int numbersElemIndex = 0;
+        int tagsElemIndex = 0;
+
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+
+          numbersVector.startNewValue(i);
+          for (int j = 0; j <= i; j++) {
+            numbersChild.setSafe(numbersElemIndex++, (i + 1) * 10 + j);
+          }
+          numbersVector.endValue(i, i + 1);
+
+          tagsVector.startNewValue(i);
+          for (int j = 0; j < 2; j++) {
+            tagsChild.setSafe(tagsElemIndex++, ("tag_" + i + "_" + 
j).getBytes());
+          }
+          tagsVector.endValue(i, 2);
+        }
+
+        idVector.setValueCount(numRows);
+        numbersChild.setValueCount(numbersElemIndex);
+        numbersVector.setValueCount(numRows);
+        tagsChild.setValueCount(tagsElemIndex);
+        tagsVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createStructArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field nameField = new Field("name", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field ageField = new Field("age", FieldType.nullable(new 
ArrowType.Int(32, true)), null);
+      Field streetField = new Field("street", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field cityField = new Field("city", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field addressField =
+          new Field(
+              "address",
+              FieldType.nullable(new ArrowType.Struct()),
+              Arrays.asList(streetField, cityField));
+
+      Field personField =
+          new Field(
+              "person",
+              FieldType.nullable(new ArrowType.Struct()),
+              Arrays.asList(nameField, ageField, addressField));
+
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Schema schema = new Schema(Arrays.asList(idField, personField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        StructVector personVector = (StructVector) root.getVector("person");
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        personVector.allocateNew();
+
+        VarCharVector nameVector = (VarCharVector) 
personVector.getChild("name");
+        IntVector ageVector = (IntVector) personVector.getChild("age");
+        StructVector addressVector = (StructVector) 
personVector.getChild("address");
+        VarCharVector streetVector = (VarCharVector) 
addressVector.getChild("street");
+        VarCharVector cityVector = (VarCharVector) 
addressVector.getChild("city");
+
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+          personVector.setIndexDefined(i);
+          addressVector.setIndexDefined(i);
+          nameVector.setSafe(i, ("Person_" + (i + 1)).getBytes());
+          ageVector.setSafe(i, 25 + i);
+          streetVector.setSafe(i, ((i + 1) + " Main St").getBytes());
+          cityVector.setSafe(i, ("City_" + (i + 1)).getBytes());
+        }
+
+        idVector.setValueCount(numRows);
+        personVector.setValueCount(numRows);
+        nameVector.setValueCount(numRows);
+        ageVector.setValueCount(numRows);
+        addressVector.setValueCount(numRows);
+        streetVector.setValueCount(numRows);
+        cityVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createMapArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field keyField =
+          new Field(MapVector.KEY_NAME, FieldType.notNullable(new 
ArrowType.Utf8()), null);
+      Field valField =
+          new Field(MapVector.VALUE_NAME, FieldType.nullable(new 
ArrowType.Int(32, true)), null);
+      Field entriesField =
+          new Field(
+              MapVector.DATA_VECTOR_NAME,
+              FieldType.notNullable(new ArrowType.Struct()),
+              Arrays.asList(keyField, valField));
+      Field mapField =
+          new Field(
+              "metadata",
+              FieldType.nullable(new ArrowType.Map(false)),
+              Arrays.asList(entriesField));
+
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Schema schema = new Schema(Arrays.asList(idField, mapField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        MapVector mapVector = (MapVector) root.getVector("metadata");
+        StructVector entries = (StructVector) mapVector.getDataVector();
+        VarCharVector keyVector = (VarCharVector) 
entries.getChild(MapVector.KEY_NAME);
+        IntVector valueVector = (IntVector) 
entries.getChild(MapVector.VALUE_NAME);
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        mapVector.allocateNew();
+
+        int entryIndex = 0;
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+          int entriesCount = 2 + (i % 2);
+          mapVector.startNewValue(i);
+          for (int j = 0; j < entriesCount; j++) {
+            keyVector.setSafe(entryIndex, ("key_" + i + "_" + j).getBytes());
+            valueVector.setSafe(entryIndex, (i + 1) * 100 + j);
+            entries.setIndexDefined(entryIndex);
+            entryIndex++;
+          }
+          mapVector.endValue(i, entriesCount);
+        }
+
+        idVector.setValueCount(numRows);
+        keyVector.setValueCount(entryIndex);
+        valueVector.setValueCount(entryIndex);
+        entries.setValueCount(entryIndex);
+        mapVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createNestedMapArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      // Define inner map (value of outer map)
+      Field innerKeyField =
+          new Field(MapVector.KEY_NAME, FieldType.notNullable(new 
ArrowType.Utf8()), null);
+      Field innerValField =
+          new Field(MapVector.VALUE_NAME, FieldType.nullable(new 
ArrowType.Int(32, true)), null);
+      Field innerEntriesField =
+          new Field(
+              MapVector.DATA_VECTOR_NAME,
+              FieldType.notNullable(new ArrowType.Struct()),
+              Arrays.asList(innerKeyField, innerValField));
+      Field innerMapField =
+          new Field(
+              MapVector.VALUE_NAME,
+              FieldType.nullable(new ArrowType.Map(false)),
+              Arrays.asList(innerEntriesField));
+
+      // Define outer map with value as the inner map
+      Field outerKeyField =
+          new Field(MapVector.KEY_NAME, FieldType.notNullable(new 
ArrowType.Utf8()), null);
+      Field outerEntriesField =
+          new Field(
+              MapVector.DATA_VECTOR_NAME,
+              FieldType.notNullable(new ArrowType.Struct()),
+              Arrays.asList(outerKeyField, innerMapField));
+      Field outerMapField =
+          new Field(
+              "metadata",
+              FieldType.nullable(new ArrowType.Map(false)),
+              Arrays.asList(outerEntriesField));
+
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Schema schema = new Schema(Arrays.asList(idField, outerMapField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        MapVector outerMapVector = (MapVector) root.getVector("metadata");
+        StructVector outerEntries = (StructVector) 
outerMapVector.getDataVector();
+        VarCharVector outerKeyVector = (VarCharVector) 
outerEntries.getChild(MapVector.KEY_NAME);
+        MapVector innerMapVector = (MapVector) 
outerEntries.getChild(MapVector.VALUE_NAME);
+        StructVector innerEntries = (StructVector) 
innerMapVector.getDataVector();
+        VarCharVector innerKeyVector = (VarCharVector) 
innerEntries.getChild(MapVector.KEY_NAME);
+        IntVector innerValueVector = (IntVector) 
innerEntries.getChild(MapVector.VALUE_NAME);
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        outerMapVector.allocateNew();
+
+        int outerEntryIndex = 0;
+        int innerEntryIndex = 0;
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+
+          int outerEntriesCount = 2 + (i % 2); // 2 or 3 outer entries
+          outerMapVector.startNewValue(i);
+          for (int j = 0; j < outerEntriesCount; j++) {
+            // Set outer key
+            outerKeyVector.setSafe(outerEntryIndex, ("outer_key_" + i + "_" + 
j).getBytes());
+
+            // Populate inner map for this outer entry at aligned index
+            innerMapVector.startNewValue(outerEntryIndex);
+            int innerEntriesCount = 2 + (j % 2); // 2 or 3 inner entries
+            for (int k = 0; k < innerEntriesCount; k++) {
+              innerKeyVector.setSafe(
+                  innerEntryIndex, ("inner_key_" + i + "_" + j + "_" + 
k).getBytes());
+              innerValueVector.setSafe(innerEntryIndex, (i + 1) * 1000 + j * 
10 + k);
+              innerEntries.setIndexDefined(innerEntryIndex);
+              innerEntryIndex++;
+            }
+            innerMapVector.endValue(outerEntryIndex, innerEntriesCount);
+
+            outerEntries.setIndexDefined(outerEntryIndex);
+            outerEntryIndex++;
+          }
+          outerMapVector.endValue(i, outerEntriesCount);
+        }
+
+        idVector.setValueCount(numRows);
+        outerKeyVector.setValueCount(outerEntryIndex);
+        innerKeyVector.setValueCount(innerEntryIndex);
+        innerValueVector.setValueCount(innerEntryIndex);
+        innerEntries.setValueCount(innerEntryIndex);
+        innerMapVector.setValueCount(outerEntryIndex);
+        outerEntries.setValueCount(outerEntryIndex);
+        outerMapVector.setValueCount(numRows);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  public static byte[] createNestedListStructArrowIpcData(int numRows)
+      throws Exception {
+    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+      Field itemNameField = new Field("item_name", FieldType.nullable(new 
ArrowType.Utf8()), null);
+      Field itemPriceField =
+          new Field(
+              "item_price",
+              FieldType.nullable(new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
+              null);
+      Field itemStructField =
+          new Field(
+              "$data$",
+              FieldType.nullable(new ArrowType.Struct()),
+              Arrays.asList(itemNameField, itemPriceField));
+
+      Field itemsField =
+          new Field(
+              "items", FieldType.nullable(new ArrowType.List()), 
Arrays.asList(itemStructField));
+
+      Field idField = new Field("id", FieldType.nullable(new ArrowType.Int(32, 
true)), null);
+      Schema schema = new Schema(Arrays.asList(idField, itemsField));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        ListVector itemsVector = (ListVector) root.getVector("items");
+        StructVector itemStructVector = (StructVector) 
itemsVector.getDataVector();
+        VarCharVector itemNameVector = (VarCharVector) 
itemStructVector.getChild("item_name");
+        Float8Vector itemPriceVector = (Float8Vector) 
itemStructVector.getChild("item_price");
+
+        root.allocateNew();
+        idVector.allocateNew(numRows);
+        itemsVector.allocateNew();
+
+        int structIndex = 0;
+        for (int i = 0; i < numRows; i++) {
+          idVector.set(i, i + 1);
+          int itemsCount = 1 + (i % 3);
+          itemsVector.startNewValue(i);
+          for (int j = 0; j < itemsCount; j++) {
+            itemNameVector.setSafe(structIndex, ("item_" + i + "_" + 
j).getBytes());
+            itemPriceVector.setSafe(structIndex, 10.99 + (i * 5.0) + j);
+            itemStructVector.setIndexDefined(structIndex);
+            structIndex++;
+          }
+          itemsVector.endValue(i, itemsCount);
+        }
+
+        idVector.setValueCount(numRows);
+        itemsVector.setValueCount(numRows);
+        itemNameVector.setValueCount(structIndex);
+        itemPriceVector.setValueCount(structIndex);
+        root.setRowCount(numRows);
+
+        return writeArrowDataToBytes(root, null);
+      }
+    }
+  }
+
+  private static byte[] writeArrowDataToBytes(
+      VectorSchemaRoot root, DictionaryProvider dictionaryProvider)
+      throws Exception {
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    try (WritableByteChannel channel = Channels.newChannel(outputStream);
+        ArrowStreamWriter writer = new ArrowStreamWriter(root, 
dictionaryProvider, channel)) {
+      writer.start();
+      writer.writeBatch();
+      writer.end();
+    }
+    return outputStream.toByteArray();
+  }
+}
diff --git a/pinot-plugins/pinot-input-format/pom.xml 
b/pinot-plugins/pinot-input-format/pom.xml
index e9aabe0079f..78c6057fe0f 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pom.xml
@@ -37,6 +37,7 @@
   </properties>
 
   <modules>
+    <module>pinot-arrow</module>
     <module>pinot-avro</module>
     <module>pinot-avro-base</module>
     <module>pinot-clp-log</module>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to