This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b69453e  Adding support for Protobuf input format  (#5293)
b69453e is described below

commit b69453e272b09314071183fc40a6516d64b0a412
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed May 27 23:50:14 2020 +0530

    Adding support for Protobuf input format  (#5293)
    
    Authored-by: Kartik Khare <kartikkhare@Kartiks-MacBook-Pro.local>
---
 .../v0_deprecated/pinot-hadoop/pom.xml             |   11 +
 .../hadoop/job/mappers/SegmentCreationMapper.java  |   10 +
 .../v0_deprecated/pinot-spark/pom.xml              |   12 +-
 .../spark/jobs/SparkSegmentCreationFunction.java   |   10 +
 .../{ => pinot-protobuf}/pom.xml                   |   45 +-
 .../protobuf/ProtoBufRecordExtractor.java          |   49 +
 .../inputformat/protobuf/ProtoBufRecordReader.java |  137 +++
 .../protobuf/ProtoBufRecordReaderConfig.java       |   18 +-
 .../protobuf/ProtoBufRecordReaderTest.java         |  173 ++++
 .../pinot/plugin/inputformat/protobuf/Sample.java  | 1075 ++++++++++++++++++++
 .../pinot-protobuf/src/test/resources/sample.desc  |    8 +
 .../pinot-protobuf/src/test/resources/sample.proto |    9 +
 pinot-plugins/pinot-input-format/pom.xml           |    1 +
 .../apache/pinot/spi/data/readers/FileFormat.java  |    2 +-
 .../org/apache/pinot/spi/utils/ResourceFinder.java |   78 ++
 15 files changed, 1600 insertions(+), 38 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
index 6b7450a..1fd7033 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml
@@ -147,6 +147,17 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-protobuf</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
     </dependency>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 47242e5..52c3393 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -44,6 +44,7 @@ import 
org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -330,6 +331,15 @@ public class SegmentCreationMapper extends 
Mapper<LongWritable, Text, LongWritab
           return readerConfig;
         }
       }
+
+      if (fileFormat == FileFormat.PROTO) {
+        try (InputStream inputStream = 
FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          ProtoBufRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, 
ProtoBufRecordReaderConfig.class);
+          _logger.info("Using Protocol Buffer record reader config: {}", 
readerConfig);
+          return readerConfig;
+        }
+      }
     }
     return null;
   }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
index 964a585..e8850b2 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml
@@ -160,7 +160,17 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-protobuf</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <!-- Spark  -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
index 80d7434..b234e3e 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -297,6 +298,15 @@ public class SparkSegmentCreationFunction implements 
Serializable {
           return readerConfig;
         }
       }
+
+      if (fileFormat == FileFormat.PROTO) {
+        try (InputStream inputStream = 
FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) {
+          ProtoBufRecordReaderConfig readerConfig =
+              JsonUtils.inputStreamToObject(inputStream, 
ProtoBufRecordReaderConfig.class);
+          _logger.info("Using Protocol Buffer record reader config: {}", 
readerConfig);
+          return readerConfig;
+        }
+      }
     }
     return null;
   }
diff --git a/pinot-plugins/pinot-input-format/pom.xml 
b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
similarity index 55%
copy from pinot-plugins/pinot-input-format/pom.xml
copy to pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
index bf7dd9c..d018a89 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml
@@ -19,54 +19,33 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot-plugins</artifactId>
+    <artifactId>pinot-input-format</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>${revision}${sha1}</version>
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pinot-input-format</artifactId>
-  <packaging>pom</packaging>
-  <name>Pinot Input Format </name>
+
+  <artifactId>pinot-protobuf</artifactId>
+  <name>Pinot Protocol Buffers</name>
   <url>https://pinot.apache.org/</url>
   <properties>
-    <pinot.root>${basedir}/../..</pinot.root>
-    <plugin.type>pinot-input-format</plugin.type>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <proto.version>3.11.4</proto.version>
   </properties>
-
-  <modules>
-    <module>pinot-avro</module>
-    <module>pinot-avro-base</module>
-    <module>pinot-confluent-avro</module>
-    <module>pinot-orc</module>
-    <module>pinot-json</module>
-    <module>pinot-parquet</module>
-    <module>pinot-csv</module>
-    <module>pinot-thrift</module>
-  </modules>
-
   <dependencies>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <!-- test -->
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <scope>test</scope>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-spi</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${proto.version}</version>
     </dependency>
   </dependencies>
 </project>
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
new file mode 100644
index 0000000..6978fe7
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordExtractor;
+import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+
+
+public class ProtoBufRecordExtractor implements RecordExtractor<Message> {
+
+  private Set<String> _fields;
+
+  @Override
+  public void init(Set<String> fields, RecordExtractorConfig 
recordExtractorConfig) {
+    _fields = fields;
+  }
+
+  @Override
+  public GenericRow extract(Message from, GenericRow to) {
+    for (String fieldName : _fields) {
+      Descriptors.FieldDescriptor fieldDescriptor = 
from.getDescriptorForType().findFieldByName(fieldName);
+      Object value = from.getField(fieldDescriptor);
+      Object convertedValue = RecordReaderUtils.convert(value);
+      to.putValue(fieldName, convertedValue);
+    }
+    return to;
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
new file mode 100644
index 0000000..2da8629
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+import org.apache.pinot.spi.utils.ResourceFinder;
+
+
+public class ProtoBufRecordReader implements RecordReader {
+  private File _dataFile;
+  private ProtoBufRecordExtractor _recordExtractor;
+
+  private InputStream _inputStream;
+  private boolean _hasNext;
+  private DynamicMessage _dynamicMessage;
+
+  private boolean hasMoreToRead()
+      throws IOException {
+    _inputStream.mark(1);
+    int nextByte = _inputStream.read();
+    _inputStream.reset();
+    return nextByte != -1;
+  }
+
+  private void init()
+      throws IOException {
+    _inputStream = RecordReaderUtils.getBufferedInputStream(_dataFile);
+    try {
+      _hasNext = hasMoreToRead();
+    } catch (Exception e) {
+      _inputStream.close();
+      throw e;
+    }
+  }
+
+  @Override
+  public void init(File dataFile, Set<String> fieldsToRead, @Nullable 
RecordReaderConfig recordReaderConfig)
+      throws IOException {
+    _dataFile = dataFile;
+    ProtoBufRecordReaderConfig protoBufRecordReaderConfig = 
(ProtoBufRecordReaderConfig) recordReaderConfig;
+    InputStream fin = getDescriptorFileInputStream(protoBufRecordReaderConfig);
+    Descriptors.Descriptor descriptor = buildProtoBufDescriptor(fin);
+    _recordExtractor = new ProtoBufRecordExtractor();
+    _recordExtractor.init(fieldsToRead, null);
+    _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
+    init();
+  }
+
+  private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin)
+      throws IOException {
+    try {
+      DescriptorProtos.FileDescriptorSet set = 
DescriptorProtos.FileDescriptorSet.parseFrom(fin);
+      Descriptors.FileDescriptor fileDescriptor =
+          Descriptors.FileDescriptor.buildFrom(set.getFile(0), new 
Descriptors.FileDescriptor[]{});
+      return fileDescriptor.getMessageTypes().get(0);
+    } catch (Descriptors.DescriptorValidationException e) {
+      throw new IOException("Descriptor file validation failed", e);
+    }
+  }
+
+  private InputStream getDescriptorFileInputStream(ProtoBufRecordReaderConfig 
protoBufRecordReaderConfig)
+      throws IOException {
+    URI descriptorFileURI = protoBufRecordReaderConfig.getDescriptorFile();
+    return ResourceFinder.openResource(descriptorFileURI);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _hasNext;
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    Message message = null;
+    try {
+      Message.Builder builder = _dynamicMessage.newBuilderForType();
+      builder.mergeDelimitedFrom(_inputStream);
+      message = builder.build();
+    } catch (Exception e) {
+      throw new IOException("Caught exception while reading protobuf object", 
e);
+    }
+    _recordExtractor.extract(message, reuse);
+    _hasNext = hasMoreToRead();
+    return reuse;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _inputStream.close();
+    init();
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _inputStream.close();
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java
similarity index 66%
copy from 
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java
copy to 
pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java
index 532fd16..8b3eaec 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java
@@ -16,8 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.readers;
+package org.apache.pinot.plugin.inputformat.protobuf;
 
-public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, OTHER
+import java.net.URI;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class ProtoBufRecordReaderConfig implements RecordReaderConfig {
+  private URI _descriptorFile;
+
+  public URI getDescriptorFile() {
+    return _descriptorFile;
+  }
+
+  public void setDescriptorFile(URI descriptorFile) {
+    _descriptorFile = descriptorFile;
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
new file mode 100644
index 0000000..ad30391
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
+  private final static Random RANDOM = new Random(System.currentTimeMillis());
+  private static final String PROTO_DATA = "_test_sample_proto_data.data";
+  private static final String DESCRIPTOR_FILE = "sample.desc";
+  private File _tempFile;
+  private RecordReader _recordReader;
+  private final static int SAMPLE_RECORDS_SIZE = 10000;
+
+  @Override
+  protected Schema getPinotSchema() {
+    return new Schema.SchemaBuilder().setSchemaName("SampleRecord")
+        .addSingleValueDimension("id", FieldSpec.DataType.INT)
+        .addSingleValueDimension("name", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("email", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("friends", FieldSpec.DataType.STRING).build();
+  }
+
+  private static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
+    List<Map<String, Object>> records = new ArrayList<>();
+
+    for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) {
+      Map<String, Object> recordMap = new HashMap<>();
+      for (FieldSpec fieldSpec : pinotSchema.getAllFieldSpecs()) {
+        recordMap.put(fieldSpec.getName(), generateRandomValue(fieldSpec));
+      }
+      records.add(recordMap);
+    }
+    return records;
+  }
+
+  private static Object generateRandomValue(FieldSpec fieldSpec) {
+    if (fieldSpec.isSingleValueField()) {
+      return generateRandomSingleValue(fieldSpec);
+    }
+    List list = new ArrayList();
+    int listSize = 1 + RANDOM.nextInt(50);
+    for (int i = 0; i < listSize; i++) {
+      list.add(generateRandomSingleValue(fieldSpec));
+    }
+    return list;
+  }
+
+  private static Object generateRandomSingleValue(FieldSpec fieldSpec) {
+    switch (fieldSpec.getDataType()) {
+      case INT:
+        return RANDOM.nextInt();
+      case LONG:
+        return RANDOM.nextLong();
+      case FLOAT:
+        return RANDOM.nextFloat();
+      case DOUBLE:
+        return RANDOM.nextDouble();
+      case STRING:
+        return RandomStringUtils.randomAscii(RANDOM.nextInt(50) + 1);
+      default:
+        throw new RuntimeException("Not supported fieldSpec - " + fieldSpec);
+    }
+  }
+
+  @BeforeClass
+  @Override
+  public void setUp()
+      throws Exception {
+    FileUtils.forceMkdir(_tempDir);
+    // Generate Pinot schema
+    _pinotSchema = getPinotSchema();
+    // Generate random records based on Pinot schema
+    _records = generateRandomRecords(_pinotSchema);
+    // Write generated random records to file
+    writeRecordsToFile(_records);
+    // Create and init RecordReader
+    _recordReader = createRecordReader();
+  }
+
+  @AfterClass
+  @Override
+  public void tearDown()
+      throws Exception {
+    FileUtils.forceDelete(_tempFile);
+  }
+
+  @Test
+  public void testRecordReader()
+      throws Exception {
+    checkValue(_recordReader, _records);
+    _recordReader.rewind();
+    checkValue(_recordReader, _records);
+  }
+
+  @Override
+  protected RecordReader createRecordReader()
+      throws Exception {
+    RecordReader recordReader = new ProtoBufRecordReader();
+    Set<String> sourceFields = getSourceFields(getPinotSchema());
+    recordReader.init(_tempFile, sourceFields, getProtoRecordReaderConfig());
+    return recordReader;
+  }
+
+  @Override
+  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+      throws Exception {
+    List<Sample.SampleRecord> lists = new ArrayList<>();
+    for (Map<String, Object> record : recordsToWrite) {
+      Sample.SampleRecord sampleRecord =
+          Sample.SampleRecord.newBuilder().setEmail((String) 
record.get("email")).setName((String) record.get("name"))
+              .setId((Integer) record.get("id")).addAllFriends((List<String>) 
record.get("friends")).build();
+
+      lists.add(sampleRecord);
+    }
+
+    _tempFile = getSampleDataPath();
+    try (FileOutputStream output = new FileOutputStream(_tempFile, true)) {
+      for (Sample.SampleRecord record : lists) {
+        record.writeDelimitedTo(output);
+      }
+    }
+  }
+
+  private File getSampleDataPath()
+      throws IOException {
+    return File.createTempFile(ProtoBufRecordReaderTest.class.getName(), 
PROTO_DATA);
+  }
+
+  private ProtoBufRecordReaderConfig getProtoRecordReaderConfig()
+      throws URISyntaxException {
+    ProtoBufRecordReaderConfig config = new ProtoBufRecordReaderConfig();
+    URL descriptorFile = 
getClass().getClassLoader().getResource(DESCRIPTOR_FILE);
+    config.setDescriptorFile(descriptorFile.toURI());
+    return config;
+  }
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java
new file mode 100644
index 0000000..86fdb5d
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java
@@ -0,0 +1,1075 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: sample.proto
+
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+public final class Sample {
+  private Sample() {
+  }
+
+  public static void 
registerAllExtensions(com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+
+  public static void 
registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) {
+    registerAllExtensions((com.google.protobuf.ExtensionRegistryLite) 
registry);
+  }
+
+  public interface SampleRecordOrBuilder extends
+                                         // 
@@protoc_insertion_point(interface_extends:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord)
+                                             
com.google.protobuf.MessageOrBuilder {
+
+    /**
+     * <code>string name = 1;</code>
+     * @return The name.
+     */
+    java.lang.String getName();
+
+    /**
+     * <code>string name = 1;</code>
+     * @return The bytes for name.
+     */
+    com.google.protobuf.ByteString getNameBytes();
+
+    /**
+     * <code>int32 id = 2;</code>
+     * @return The id.
+     */
+    int getId();
+
+    /**
+     * <code>string email = 3;</code>
+     * @return The email.
+     */
+    java.lang.String getEmail();
+
+    /**
+     * <code>string email = 3;</code>
+     * @return The bytes for email.
+     */
+    com.google.protobuf.ByteString getEmailBytes();
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @return A list containing the friends.
+     */
+    java.util.List<java.lang.String> getFriendsList();
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @return The count of friends.
+     */
+    int getFriendsCount();
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @param index The index of the element to return.
+     * @return The friends at the given index.
+     */
+    java.lang.String getFriends(int index);
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @param index The index of the value to return.
+     * @return The bytes of the friends at the given index.
+     */
+    com.google.protobuf.ByteString getFriendsBytes(int index);
+  }
+
+  /**
+   * Protobuf type {@code 
org.apache.pinot.plugin.inputformat.protobuf.SampleRecord}
+   */
+  public static final class SampleRecord extends 
com.google.protobuf.GeneratedMessageV3 implements
+                                                                               
         // 
@@protoc_insertion_point(message_implements:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord)
+                                                                               
             SampleRecordOrBuilder {
+    private static final long serialVersionUID = 0L;
+
+    // Use SampleRecord.newBuilder() to construct.
+    private SampleRecord(com.google.protobuf.GeneratedMessageV3.Builder<?> 
builder) {
+      super(builder);
+    }
+
+    private SampleRecord() {
+      name_ = "";
+      email_ = "";
+      friends_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+    }
+
+    @java.lang.Override
+    @SuppressWarnings({"unused"})
+    protected java.lang.Object newInstance(UnusedPrivateParameter unused) {
+      return new SampleRecord();
+    }
+
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet getUnknownFields() {
+      return this.unknownFields;
+    }
+
+    private SampleRecord(com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      this();
+      if (extensionRegistry == null) {
+        throw new java.lang.NullPointerException();
+      }
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields = 
com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            case 10: {
+              java.lang.String s = input.readStringRequireUtf8();
+
+              name_ = s;
+              break;
+            }
+            case 16: {
+
+              id_ = input.readInt32();
+              break;
+            }
+            case 26: {
+              java.lang.String s = input.readStringRequireUtf8();
+
+              email_ = s;
+              break;
+            }
+            case 34: {
+              java.lang.String s = input.readStringRequireUtf8();
+              if (!((mutable_bitField0_ & 0x00000001) != 0)) {
+                friends_ = new com.google.protobuf.LazyStringArrayList();
+                mutable_bitField0_ |= 0x00000001;
+              }
+              friends_.add(s);
+              break;
+            }
+            default: {
+              if (!parseUnknownField(input, unknownFields, extensionRegistry, 
tag)) {
+                done = true;
+              }
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new 
com.google.protobuf.InvalidProtocolBufferException(e).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000001) != 0)) {
+          friends_ = friends_.getUnmodifiableView();
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+
+    public static final com.google.protobuf.Descriptors.Descriptor 
getDescriptor() {
+      return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor;
+    }
+
+    @java.lang.Override
+    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable 
internalGetFieldAccessorTable() {
+      return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable
+          
.ensureFieldAccessorsInitialized(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.class,
+              
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.Builder.class);
+    }
+
+    public static final int NAME_FIELD_NUMBER = 1;
+    private volatile java.lang.Object name_;
+
+    /**
+     * <code>string name = 1;</code>
+     * @return The name.
+     */
+    public java.lang.String getName() {
+      java.lang.Object ref = name_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) 
ref;
+        java.lang.String s = bs.toStringUtf8();
+        name_ = s;
+        return s;
+      }
+    }
+
+    /**
+     * <code>string name = 1;</code>
+     * @return The bytes for name.
+     */
+    public com.google.protobuf.ByteString getNameBytes() {
+      java.lang.Object ref = name_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
+        name_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    public static final int ID_FIELD_NUMBER = 2;
+    private int id_;
+
+    /**
+     * <code>int32 id = 2;</code>
+     * @return The id.
+     */
+    public int getId() {
+      return id_;
+    }
+
+    public static final int EMAIL_FIELD_NUMBER = 3;
+    private volatile java.lang.Object email_;
+
+    /**
+     * <code>string email = 3;</code>
+     * @return The email.
+     */
+    public java.lang.String getEmail() {
+      java.lang.Object ref = email_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) 
ref;
+        java.lang.String s = bs.toStringUtf8();
+        email_ = s;
+        return s;
+      }
+    }
+
+    /**
+     * <code>string email = 3;</code>
+     * @return The bytes for email.
+     */
+    public com.google.protobuf.ByteString getEmailBytes() {
+      java.lang.Object ref = email_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
+        email_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    public static final int FRIENDS_FIELD_NUMBER = 4;
+    private com.google.protobuf.LazyStringList friends_;
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @return A list containing the friends.
+     */
+    public com.google.protobuf.ProtocolStringList getFriendsList() {
+      return friends_;
+    }
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @return The count of friends.
+     */
+    public int getFriendsCount() {
+      return friends_.size();
+    }
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @param index The index of the element to return.
+     * @return The friends at the given index.
+     */
+    public java.lang.String getFriends(int index) {
+      return friends_.get(index);
+    }
+
+    /**
+     * <code>repeated string friends = 4;</code>
+     * @param index The index of the value to return.
+     * @return The bytes of the friends at the given index.
+     */
+    public com.google.protobuf.ByteString getFriendsBytes(int index) {
+      return friends_.getByteString(index);
+    }
+
+    private byte memoizedIsInitialized = -1;
+
+    @java.lang.Override
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized == 1) {
+        return true;
+      }
+      if (isInitialized == 0) {
+        return false;
+      }
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    @java.lang.Override
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+        throws java.io.IOException {
+      if (!getNameBytes().isEmpty()) {
+        com.google.protobuf.GeneratedMessageV3.writeString(output, 1, name_);
+      }
+      if (id_ != 0) {
+        output.writeInt32(2, id_);
+      }
+      if (!getEmailBytes().isEmpty()) {
+        com.google.protobuf.GeneratedMessageV3.writeString(output, 3, email_);
+      }
+      for (int i = 0; i < friends_.size(); i++) {
+        com.google.protobuf.GeneratedMessageV3.writeString(output, 4, 
friends_.getRaw(i));
+      }
+      unknownFields.writeTo(output);
+    }
+
+    @java.lang.Override
+    public int getSerializedSize() {
+      int size = memoizedSize;
+      if (size != -1) {
+        return size;
+      }
+
+      size = 0;
+      if (!getNameBytes().isEmpty()) {
+        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, 
name_);
+      }
+      if (id_ != 0) {
+        size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, id_);
+      }
+      if (!getEmailBytes().isEmpty()) {
+        size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, 
email_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < friends_.size(); i++) {
+          dataSize += computeStringSizeNoTag(friends_.getRaw(i));
+        }
+        size += dataSize;
+        size += 1 * getFriendsList().size();
+      }
+      size += unknownFields.getSerializedSize();
+      memoizedSize = size;
+      return size;
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord)) {
+        return super.equals(obj);
+      }
+      org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord other =
+          (org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) 
obj;
+
+      if (!getName().equals(other.getName())) {
+        return false;
+      }
+      if (getId() != other.getId()) {
+        return false;
+      }
+      if (!getEmail().equals(other.getEmail())) {
+        return false;
+      }
+      if (!getFriendsList().equals(other.getFriendsList())) {
+        return false;
+      }
+      if (!unknownFields.equals(other.unknownFields)) {
+        return false;
+      }
+      return true;
+    }
+
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptor().hashCode();
+      hash = (37 * hash) + NAME_FIELD_NUMBER;
+      hash = (53 * hash) + getName().hashCode();
+      hash = (37 * hash) + ID_FIELD_NUMBER;
+      hash = (53 * hash) + getId();
+      hash = (37 * hash) + EMAIL_FIELD_NUMBER;
+      hash = (53 * hash) + getEmail().hashCode();
+      if (getFriendsCount() > 0) {
+        hash = (37 * hash) + FRIENDS_FIELD_NUMBER;
+        hash = (53 * hash) + getFriendsList().hashCode();
+      }
+      hash = (29 * hash) + unknownFields.hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(java.nio.ByteBuffer data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(java.nio.ByteBuffer data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(
+        com.google.protobuf.ByteString data, 
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseFrom(java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, 
extensionRegistry);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseDelimitedFrom(
+        java.io.InputStream input)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, 
input);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parseDelimitedFrom(
+        java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, 
input, extensionRegistry);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input);
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(
+        com.google.protobuf.CodedInputStream input, 
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return 
com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, 
extensionRegistry);
+    }
+
+    @java.lang.Override
+    public Builder newBuilderForType() {
+      return newBuilder();
+    }
+
+    public static Builder newBuilder() {
+      return DEFAULT_INSTANCE.toBuilder();
+    }
+
+    public static Builder 
newBuilder(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
prototype) {
+      return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+    }
+
+    @java.lang.Override
+    public Builder toBuilder() {
+      return this == DEFAULT_INSTANCE ? new Builder() : new 
Builder().mergeFrom(this);
+    }
+
+    @java.lang.Override
+    protected Builder 
newBuilderForType(com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+
+    /**
+     * Protobuf type {@code 
org.apache.pinot.plugin.inputformat.protobuf.SampleRecord}
+     */
+    public static final class Builder extends 
com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
+                                                                               
                       // 
@@protoc_insertion_point(builder_implements:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord)
+                                                                               
                           
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecordOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor 
getDescriptor() {
+        return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor;
+      }
+
+      @java.lang.Override
+      protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable 
internalGetFieldAccessorTable() {
+        return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable
+            
.ensureFieldAccessorsInitialized(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.class,
+                
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.Builder.class);
+      }
+
+      // Construct using 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(com.google.protobuf.GeneratedMessageV3.BuilderParent 
parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders) {
+        }
+      }
+
+      @java.lang.Override
+      public Builder clear() {
+        super.clear();
+        name_ = "";
+
+        id_ = 0;
+
+        email_ = "";
+
+        friends_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      @java.lang.Override
+      public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() 
{
+        return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor;
+      }
+
+      @java.lang.Override
+      public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
getDefaultInstanceForType() {
+        return 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.getDefaultInstance();
+      }
+
+      @java.lang.Override
+      public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
build() {
+        org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      @java.lang.Override
+      public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
buildPartial() {
+        org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
result =
+            new 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord(this);
+        int from_bitField0_ = bitField0_;
+        result.name_ = name_;
+        result.id_ = id_;
+        result.email_ = email_;
+        if (((bitField0_ & 0x00000001) != 0)) {
+          friends_ = friends_.getUnmodifiableView();
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.friends_ = friends_;
+        onBuilt();
+        return result;
+      }
+
+      @java.lang.Override
+      public Builder clone() {
+        return super.clone();
+      }
+
+      @java.lang.Override
+      public Builder setField(com.google.protobuf.Descriptors.FieldDescriptor 
field, java.lang.Object value) {
+        return super.setField(field, value);
+      }
+
+      @java.lang.Override
+      public Builder 
clearField(com.google.protobuf.Descriptors.FieldDescriptor field) {
+        return super.clearField(field);
+      }
+
+      @java.lang.Override
+      public Builder 
clearOneof(com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+        return super.clearOneof(oneof);
+      }
+
+      @java.lang.Override
+      public Builder 
setRepeatedField(com.google.protobuf.Descriptors.FieldDescriptor field, int 
index,
+          java.lang.Object value) {
+        return super.setRepeatedField(field, index, value);
+      }
+
+      @java.lang.Override
+      public Builder 
addRepeatedField(com.google.protobuf.Descriptors.FieldDescriptor field, 
java.lang.Object value) {
+        return super.addRepeatedField(field, value);
+      }
+
+      @java.lang.Override
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) {
+          return 
mergeFrom((org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) 
other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder 
mergeFrom(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
other) {
+        if (other == 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.getDefaultInstance())
 {
+          return this;
+        }
+        if (!other.getName().isEmpty()) {
+          name_ = other.name_;
+          onChanged();
+        }
+        if (other.getId() != 0) {
+          setId(other.getId());
+        }
+        if (!other.getEmail().isEmpty()) {
+          email_ = other.email_;
+          onChanged();
+        }
+        if (!other.friends_.isEmpty()) {
+          if (friends_.isEmpty()) {
+            friends_ = other.friends_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureFriendsIsMutable();
+            friends_.addAll(other.friends_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.unknownFields);
+        onChanged();
+        return this;
+      }
+
+      @java.lang.Override
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      @java.lang.Override
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = 
(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) 
e.getUnfinishedMessage();
+          throw e.unwrapIOException();
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+
+      private int bitField0_;
+
+      private java.lang.Object name_ = "";
+
+      /**
+       * <code>string name = 1;</code>
+       * @return The name.
+       */
+      public java.lang.String getName() {
+        java.lang.Object ref = name_;
+        if (!(ref instanceof java.lang.String)) {
+          com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) 
ref;
+          java.lang.String s = bs.toStringUtf8();
+          name_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+
+      /**
+       * <code>string name = 1;</code>
+       * @return The bytes for name.
+       */
+      public com.google.protobuf.ByteString getNameBytes() {
+        java.lang.Object ref = name_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
+          name_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+
+      /**
+       * <code>string name = 1;</code>
+       * @param value The name to set.
+       * @return This builder for chaining.
+       */
+      public Builder setName(java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+
+        name_ = value;
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>string name = 1;</code>
+       * @return This builder for chaining.
+       */
+      public Builder clearName() {
+
+        name_ = getDefaultInstance().getName();
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>string name = 1;</code>
+       * @param value The bytes for name to set.
+       * @return This builder for chaining.
+       */
+      public Builder setNameBytes(com.google.protobuf.ByteString value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        checkByteStringIsUtf8(value);
+
+        name_ = value;
+        onChanged();
+        return this;
+      }
+
+      private int id_;
+
+      /**
+       * <code>int32 id = 2;</code>
+       * @return The id.
+       */
+      public int getId() {
+        return id_;
+      }
+
+      /**
+       * <code>int32 id = 2;</code>
+       * @param value The id to set.
+       * @return This builder for chaining.
+       */
+      public Builder setId(int value) {
+
+        id_ = value;
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>int32 id = 2;</code>
+       * @return This builder for chaining.
+       */
+      public Builder clearId() {
+
+        id_ = 0;
+        onChanged();
+        return this;
+      }
+
+      private java.lang.Object email_ = "";
+
+      /**
+       * <code>string email = 3;</code>
+       * @return The email.
+       */
+      public java.lang.String getEmail() {
+        java.lang.Object ref = email_;
+        if (!(ref instanceof java.lang.String)) {
+          com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) 
ref;
+          java.lang.String s = bs.toStringUtf8();
+          email_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+
+      /**
+       * <code>string email = 3;</code>
+       * @return The bytes for email.
+       */
+      public com.google.protobuf.ByteString getEmailBytes() {
+        java.lang.Object ref = email_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref);
+          email_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+
+      /**
+       * <code>string email = 3;</code>
+       * @param value The email to set.
+       * @return This builder for chaining.
+       */
+      public Builder setEmail(java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+
+        email_ = value;
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>string email = 3;</code>
+       * @return This builder for chaining.
+       */
+      public Builder clearEmail() {
+
+        email_ = getDefaultInstance().getEmail();
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>string email = 3;</code>
+       * @param value The bytes for email to set.
+       * @return This builder for chaining.
+       */
+      public Builder setEmailBytes(com.google.protobuf.ByteString value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        checkByteStringIsUtf8(value);
+
+        email_ = value;
+        onChanged();
+        return this;
+      }
+
+      private com.google.protobuf.LazyStringList friends_ = 
com.google.protobuf.LazyStringArrayList.EMPTY;
+
+      private void ensureFriendsIsMutable() {
+        if (!((bitField0_ & 0x00000001) != 0)) {
+          friends_ = new com.google.protobuf.LazyStringArrayList(friends_);
+          bitField0_ |= 0x00000001;
+        }
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @return A list containing the friends.
+       */
+      public com.google.protobuf.ProtocolStringList getFriendsList() {
+        return friends_.getUnmodifiableView();
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @return The count of friends.
+       */
+      public int getFriendsCount() {
+        return friends_.size();
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param index The index of the element to return.
+       * @return The friends at the given index.
+       */
+      public java.lang.String getFriends(int index) {
+        return friends_.get(index);
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param index The index of the value to return.
+       * @return The bytes of the friends at the given index.
+       */
+      public com.google.protobuf.ByteString getFriendsBytes(int index) {
+        return friends_.getByteString(index);
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param index The index to set the value at.
+       * @param value The friends to set.
+       * @return This builder for chaining.
+       */
+      public Builder setFriends(int index, java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureFriendsIsMutable();
+        friends_.set(index, value);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param value The friends to add.
+       * @return This builder for chaining.
+       */
+      public Builder addFriends(java.lang.String value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensureFriendsIsMutable();
+        friends_.add(value);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param values The friends to add.
+       * @return This builder for chaining.
+       */
+      public Builder addAllFriends(java.lang.Iterable<java.lang.String> 
values) {
+        ensureFriendsIsMutable();
+        com.google.protobuf.AbstractMessageLite.Builder.addAll(values, 
friends_);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @return This builder for chaining.
+       */
+      public Builder clearFriends() {
+        friends_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+        return this;
+      }
+
+      /**
+       * <code>repeated string friends = 4;</code>
+       * @param value The bytes of the friends to add.
+       * @return This builder for chaining.
+       */
+      public Builder addFriendsBytes(com.google.protobuf.ByteString value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        checkByteStringIsUtf8(value);
+        ensureFriendsIsMutable();
+        friends_.add(value);
+        onChanged();
+        return this;
+      }
+
+      @java.lang.Override
+      public final Builder setUnknownFields(final 
com.google.protobuf.UnknownFieldSet unknownFields) {
+        return super.setUnknownFields(unknownFields);
+      }
+
+      @java.lang.Override
+      public final Builder mergeUnknownFields(final 
com.google.protobuf.UnknownFieldSet unknownFields) {
+        return super.mergeUnknownFields(unknownFields);
+      }
+
+      // 
@@protoc_insertion_point(builder_scope:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord)
+    }
+
+    // 
@@protoc_insertion_point(class_scope:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord)
+    private static final 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
DEFAULT_INSTANCE;
+
+    static {
+      DEFAULT_INSTANCE = new 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord();
+    }
+
+    public static 
org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
getDefaultInstance() {
+      return DEFAULT_INSTANCE;
+    }
+
+    private static final com.google.protobuf.Parser<SampleRecord> PARSER =
+        new com.google.protobuf.AbstractParser<SampleRecord>() {
+          @java.lang.Override
+          public SampleRecord 
parsePartialFrom(com.google.protobuf.CodedInputStream input,
+              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+              throws com.google.protobuf.InvalidProtocolBufferException {
+            return new SampleRecord(input, extensionRegistry);
+          }
+        };
+
+    public static com.google.protobuf.Parser<SampleRecord> parser() {
+      return PARSER;
+    }
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<SampleRecord> getParserForType() {
+      return PARSER;
+    }
+
+    @java.lang.Override
+    public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord 
getDefaultInstanceForType() {
+      return DEFAULT_INSTANCE;
+    }
+  }
+
+  private static final com.google.protobuf.Descriptors.Descriptor
+      
internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor;
+  private static final 
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      
internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() 
{
+    return descriptor;
+  }
+
+  private static com.google.protobuf.Descriptors.FileDescriptor descriptor;
+
+  static {
+    java.lang.String[] descriptorData =
+        {"\n\014sample.proto\022,org.apache.pinot.plugin." + 
"inputformat.protobuf\"H\n\014SampleRecord\022\014\n\004"
+            + "name\030\001 \001(\t\022\n\n\002id\030\002 
\001(\005\022\r\n\005email\030\003 \001(\t\022\017\n"
+            + "\007friends\030\004 \003(\tb\006proto3"};
+    descriptor = com.google.protobuf.Descriptors.FileDescriptor
+        .internalBuildGeneratedFileFrom(descriptorData, new 
com.google.protobuf.Descriptors.FileDescriptor[]{});
+    
internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor
 =
+        getDescriptor().getMessageTypes().get(0);
+    
internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable
 =
+        new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+            
internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor,
+            new java.lang.String[]{"Name", "Id", "Email", "Friends",});
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc
new file mode 100644
index 0000000..b863b5a
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc
@@ -0,0 +1,8 @@
+
+�
+sample.proto,org.apache.pinot.plugin.inputformat.protobuf"b
+SampleRecord
+name (     Rname
+id (Rid
+email (    Remail
+friends (  Rfriendsbproto3
\ No newline at end of file
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto
new file mode 100644
index 0000000..e27377b
--- /dev/null
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+package org.apache.pinot.plugin.inputformat.protobuf;
+
+message SampleRecord {
+  string name = 1;
+  int32 id = 2;
+  string email = 3;
+  repeated string friends = 4;
+}
diff --git a/pinot-plugins/pinot-input-format/pom.xml 
b/pinot-plugins/pinot-input-format/pom.xml
index bf7dd9c..629f396 100644
--- a/pinot-plugins/pinot-input-format/pom.xml
+++ b/pinot-plugins/pinot-input-format/pom.xml
@@ -48,6 +48,7 @@
     <module>pinot-parquet</module>
     <module>pinot-csv</module>
     <module>pinot-thrift</module>
+    <module>pinot-protobuf</module>
   </modules>
 
   <dependencies>
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java
index 532fd16..6e62b29 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.spi.data.readers;
 
 public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, OTHER
+  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, PROTO, OTHER
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java
new file mode 100644
index 0000000..7c5644e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+
+/**
+ * Utility class containing helper method for accessing a particular resource
+ */
+public class ResourceFinder {
+
+  /**
+   * Access a resource for a particular URI
+   *
+   * @param uri of the resource
+   * @return InputStream containing file contents
+   * @throws IOException
+   */
+  public static InputStream openResource(URI uri)
+      throws IOException {
+    File file = new File(uri);
+    return new FileInputStream(file);
+  }
+
+  /**
+   * Access a resource on a FilePath
+   *
+   * @param classLoader ClassPath for the resource
+   * @param pathName Absolute or Relative Path of the resource
+   * @return InputStream containing file contents
+   * @throws IOException
+   */
+  public static InputStream openResource(ClassLoader classLoader, String 
pathName)
+      throws IOException {
+    Path path = Paths.get(pathName);
+    if (path.isAbsolute()) {
+      return new FileInputStream(pathName);
+    } else {
+      return openResourceWithRelativePath(classLoader, pathName);
+    }
+  }
+
+  /**
+   * Access a resource on a Relative FilePath
+   *
+   * @param classLoader ClassPath for the resource
+   * @param pathName Relative Path of the resource
+   * @return InputStream containing file contents
+   * @throws IOException
+   */
+  public static InputStream openResourceWithRelativePath(ClassLoader 
classLoader, String pathName)
+      throws IOException {
+    return classLoader.getResourceAsStream(pathName);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to