This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2253bd717a Improving gz support for avro record readers (#9951)
2253bd717a is described below

commit 2253bd717a10c15220a1157316e233d11531e04b
Author: Seunghyun Lee <seungh...@startree.ai>
AuthorDate: Sun Dec 11 11:49:22 2022 -0800

    Improving gz support for avro record readers (#9951)
---
 .../pinot/plugin/inputformat/avro/AvroUtils.java   |   3 +-
 .../inputformat/avro/AvroRecordReaderTest.java     |   8 +-
 .../inputformat/csv/CSVRecordReaderTest.java       |  11 +-
 .../json/GzippedJSONRecordReaderTest.java          |  54 ------
 .../inputformat/json/JSONRecordReaderTest.java     |  12 +-
 .../plugin/inputformat/orc/ORCRecordReader.java    |   4 +-
 .../inputformat/orc/ORCRecordReaderTest.java       |  33 +---
 .../parquet/ParquetAvroRecordReader.java           |   4 +-
 .../parquet/ParquetNativeRecordReader.java         |   4 +-
 .../inputformat/parquet/ParquetRecordReader.java   |   4 +-
 .../parquet/ParquetNativeRecordReaderTest.java     |  10 +-
 .../parquet/ParquetRecordReaderTest.java           |  62 +------
 .../protobuf/ProtoBufRecordReaderTest.java         |  31 ++--
 .../inputformat/thrift/ThriftRecordReaderTest.java | 197 +++++++++++----------
 .../spi/data/readers/BaseRecordExtractor.java      |   3 +
 .../pinot/spi/data/readers/RecordReaderUtils.java  |  10 +-
 .../spi/data/readers/AbstractRecordReaderTest.java |  70 +++++++-
 17 files changed, 246 insertions(+), 274 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index 8c98f57a3f..7fbc087fcb 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -39,6 +39,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -214,7 +215,7 @@ public class AvroUtils {
    */
   public static DataFileStream<GenericRecord> getAvroReader(File avroFile)
       throws IOException {
-    if (avroFile.getName().endsWith(".gz")) {
+    if (RecordReaderUtils.isGZippedFile(avroFile)) {
       return new DataFileStream<>(new GZIPInputStream(new 
FileInputStream(avroFile)), new GenericDatumReader<>());
     } else {
       return new DataFileStream<>(new FileInputStream(avroFile), new 
GenericDatumReader<>());
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
index 3d075a6ee5..9d42e9ce96 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
@@ -33,10 +33,9 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 public class AvroRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.avro");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     AvroRecordReader avroRecordReader = new AvroRecordReader();
     avroRecordReader.init(_dataFile, _sourceFields, null);
@@ -59,4 +58,9 @@ public class AvroRecordReaderTest extends 
AbstractRecordReaderTest {
       }
     }
   }
+
+  @Override
+  protected String getDataFileName() {
+    return "data.avro";
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index e4b26d980d..0bbbcef26c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -38,22 +38,20 @@ import org.testng.annotations.Test;
 
 public class CSVRecordReaderTest extends AbstractRecordReaderTest {
   private static final char CSV_MULTI_VALUE_DELIMITER = '\t';
-  private final File _dataFile = new File(_tempDir, "data.csv");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
     csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
     CSVRecordReader csvRecordReader = new CSVRecordReader();
-    csvRecordReader.init(_dataFile, _sourceFields, csvRecordReaderConfig);
+    csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig);
     return csvRecordReader;
   }
 
   @Override
   protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception {
-
     Schema pinotSchema = getPinotSchema();
     String[] columns = pinotSchema.getColumnNames().toArray(new String[0]);
     try (FileWriter fileWriter = new FileWriter(_dataFile);
@@ -73,6 +71,11 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     }
   }
 
+  @Override
+  protected String getDataFileName() {
+    return "data.csv";
+  }
+
   @Override
   protected void checkValue(RecordReader recordReader, List<Map<String, 
Object>> expectedRecordsMap,
       List<Object[]> expectedPrimaryKeys)
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java
deleted file mode 100644
index 997c1c0495..0000000000
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.inputformat.json;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.GZIPOutputStream;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * Unit test for {@link JSONRecordReader} for a Gzipped JSON file.
- * Relies on the {@link JSONRecordReaderTest} for actual tests, by simply 
overriding
- * the JSON file generation to generate a gzipped JSON file.
- */
-public class GzippedJSONRecordReaderTest extends JSONRecordReaderTest {
-  private final File _dateFile = new File(_tempDir, "data.json");
-
-  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
-      throws Exception {
-    try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(new 
FileOutputStream(_dateFile)),
-        StandardCharsets.UTF_8)) {
-      for (Map<String, Object> r : recordsToWrite) {
-        ObjectNode jsonRecord = JsonUtils.newObjectNode();
-        for (String key : r.keySet()) {
-          jsonRecord.set(key, JsonUtils.objectToJsonNode(r.get(key)));
-        }
-        writer.write(jsonRecord.toString());
-      }
-    }
-  }
-}
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
index 81ab49983c..82f83e1779 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
@@ -33,20 +33,19 @@ import org.testng.Assert;
 
 
 public class JSONRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dateFile = new File(_tempDir, "data.json");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     JSONRecordReader recordReader = new JSONRecordReader();
-    recordReader.init(_dateFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
   @Override
   protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception {
-    try (FileWriter fileWriter = new FileWriter(_dateFile)) {
+    try (FileWriter fileWriter = new FileWriter(_dataFile)) {
       for (Map<String, Object> r : recordsToWrite) {
         ObjectNode jsonRecord = JsonUtils.newObjectNode();
         for (String key : r.keySet()) {
@@ -57,6 +56,11 @@ public class JSONRecordReaderTest extends 
AbstractRecordReaderTest {
     }
   }
 
+  @Override
+  protected String getDataFileName() {
+    return "data.json";
+  }
+
   @Override
   protected void checkValue(RecordReader recordReader, List<Map<String, 
Object>> expectedRecordsMap,
       List<Object[]> expectedPrimaryKeys)
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index f98ab58921..8a4d3fd709 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -66,6 +66,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * </ul>
  */
 public class ORCRecordReader implements RecordReader {
+  private static final String EXTENSION = "orc";
+
   private List<String> _orcFields;
   private List<TypeDescription> _orcFieldTypes;
   private boolean[] _includeOrcFields;
@@ -78,7 +80,7 @@ public class ORCRecordReader implements RecordReader {
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     Configuration configuration = new Configuration();
-    File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc");
+    File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     Reader orcReader = OrcFile.createReader(new 
Path(orcFile.getAbsolutePath()),
         
OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
     TypeDescription orcSchema = orcReader.getSchema();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index dd9b12a78c..c618b35285 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -17,15 +17,9 @@ package org.apache.pinot.plugin.inputformat.orc;
  * specific language governing permissions and limitations
  * under the License.
  */
-
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.GZIPOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -38,26 +32,17 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.annotations.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 public class ORCRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.orc");
-
-  private void compressGzip(String sourcePath, String targetPath)
-      throws IOException {
-    try (GZIPOutputStream gos = new GZIPOutputStream(new 
FileOutputStream(Paths.get(targetPath).toFile()))) {
-      Files.copy(Paths.get(sourcePath), gos);
-    }
-  }
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(_dataFile, _sourceFields, null);
+    orcRecordReader.init(file, _sourceFields, null);
     return orcRecordReader;
   }
 
@@ -158,16 +143,8 @@ public class ORCRecordReaderTest extends 
AbstractRecordReaderTest {
     writer.close();
   }
 
-  @Test
-  public void testGzipORCRecordReader()
-      throws Exception {
-    String gzipFileName = "data.orc.gz";
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, 
gzipFileName));
-    final File gzDataFile = new File(_tempDir, gzipFileName);
-    ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(gzDataFile, _sourceFields, null);
-    checkValue(orcRecordReader, _records, _primaryKeys);
-    orcRecordReader.rewind();
-    checkValue(orcRecordReader, _records, _primaryKeys);
+  @Override
+  protected String getDataFileName() {
+    return "data.orc";
   }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 787ddc25e7..8caf384630 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -41,6 +41,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  *   
https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a>
  */
 public class ParquetAvroRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private Path _dataFilePath;
   private AvroRecordExtractor _recordExtractor;
   private ParquetReader<GenericRecord> _parquetReader;
@@ -49,7 +51,7 @@ public class ParquetAvroRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _recordExtractor = new AvroRecordExtractor();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index da89c8a382..247bd6c82a 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * Record reader for Native Parquet file.
  */
 public class ParquetNativeRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private Path _dataFilePath;
   private ParquetNativeRecordExtractor _recordExtractor;
   private MessageType _schema;
@@ -59,7 +61,7 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
     _recordExtractor = new ParquetNativeRecordExtractor();
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 60886b3b30..0b119baad3 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -34,13 +34,15 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * It has two implementations: {@link ParquetAvroRecordReader} (Default) and 
{@link ParquetNativeRecordReader}.
  */
 public class ParquetRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private RecordReader _internalParquetRecordReader;
   private boolean _useAvroParquetRecordReader = true;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     if (recordReaderConfig != null && ((ParquetRecordReaderConfig) 
recordReaderConfig).useParquetAvroRecordReader()) {
       _internalParquetRecordReader = new ParquetAvroRecordReader();
     } else if (recordReaderConfig != null
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
index 378195d976..58177ee38c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -34,13 +34,12 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.parquet");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
-    recordReader.init(_dataFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
@@ -63,4 +62,9 @@ public class ParquetNativeRecordReaderTest extends 
AbstractRecordReaderTest {
       }
     }
   }
+
+  @Override
+  protected String getDataFileName() {
+    return "data.parquet";
+  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 345cb1cdbd..78e96203ce 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -20,14 +20,10 @@ package org.apache.pinot.plugin.inputformat.parquet;
 
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.GZIPOutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -43,16 +39,16 @@ import org.testng.annotations.Test;
 
 
 public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.parquet");
-  private final String _gzipFileName = "data.parquet.gz";
   private final File _testParquetFileWithInt96AndDecimal =
       new 
File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
 
+  private static final int NUM_RECORDS_TEST_PARQUET_WITH_INT96 = 1965;
+
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(_dataFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
@@ -76,11 +72,9 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
     }
   }
 
-  private void compressGzip(String sourcePath, String targetPath)
-      throws IOException {
-    try (GZIPOutputStream gos = new GZIPOutputStream(new 
FileOutputStream(Paths.get(targetPath).toFile()))) {
-      Files.copy(Paths.get(sourcePath), gos);
-    }
+  @Override
+  protected String getDataFileName() {
+    return "data.parquet";
   }
 
   @Test
@@ -122,9 +116,9 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
 
     final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
     File nativeParquetFile = new 
File(getClass().getClassLoader().getResource("users.parquet").getFile());
-    parquetRecordReader.init(nativeParquetFile, null, null);
+    parquetRecordReader2.init(nativeParquetFile, null, null);
     // Should be native since file metadata does not have avro schema
-    Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader());
+    Assert.assertFalse(parquetRecordReader2.useAvroParquetRecordReader());
   }
 
   @Test
@@ -168,42 +162,4 @@ public class ParquetRecordReaderTest extends 
AbstractRecordReaderTest {
     Assert.assertEquals(recordsRead, totalRecords,
         "Message read from ParquetRecordReader doesn't match the expected 
number.");
   }
-
-  @Test
-  public void testGzipParquetRecordReader()
-      throws IOException {
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, 
_gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(gzDataFile, _sourceFields, null);
-    testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE);
-  }
-
-  @Test
-  public void testGzipParquetAvroRecordReader()
-      throws IOException {
-    ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, 
_gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig());
-    testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
-  }
-
-  @Test
-  public void testGzipParquetNativeRecordReader()
-      throws IOException {
-    ParquetNativeRecordReader nativeRecordReader = new 
ParquetNativeRecordReader();
-
-    final String gzParquetFileWithInt96AndDecimal =
-        String.format("%s.gz", 
_testParquetFileWithInt96AndDecimal.getAbsolutePath());
-    compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), 
gzParquetFileWithInt96AndDecimal);
-    final File gzTestParquetFileWithInt96AndDecimal = new 
File(gzParquetFileWithInt96AndDecimal);
-    nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, 
ImmutableSet.of(), new ParquetRecordReaderConfig());
-    testReadParquetFile(nativeRecordReader, 1965);
-
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, 
_gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new 
ParquetRecordReaderConfig());
-    testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
-  }
 }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
index ce9cd1b346..eac590a803 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.plugin.inputformat.protobuf;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -35,7 +34,6 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -44,7 +42,7 @@ public class ProtoBufRecordReaderTest extends 
AbstractRecordReaderTest {
   private final static Random RANDOM = new Random(System.currentTimeMillis());
   private static final String PROTO_DATA = "_test_sample_proto_data.data";
   private static final String DESCRIPTOR_FILE = "sample.desc";
-  private File _tempFile;
+  private File _dataFile;
   private RecordReader _recordReader;
   private final static int SAMPLE_RECORDS_SIZE = 10000;
 
@@ -57,7 +55,7 @@ public class ProtoBufRecordReaderTest extends 
AbstractRecordReaderTest {
         .addMultiValueDimension("friends", FieldSpec.DataType.STRING).build();
   }
 
-  private static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
+  protected static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
     List<Map<String, Object>> records = new ArrayList<>();
 
     for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) {
@@ -115,13 +113,6 @@ public class ProtoBufRecordReaderTest extends 
AbstractRecordReaderTest {
     _recordReader = createRecordReader();
   }
 
-  @AfterClass
-  @Override
-  public void tearDown()
-      throws Exception {
-    FileUtils.forceDelete(_tempFile);
-  }
-
   @Test
   public void testRecordReader()
       throws Exception {
@@ -131,11 +122,11 @@ public class ProtoBufRecordReaderTest extends 
AbstractRecordReaderTest {
   }
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     RecordReader recordReader = new ProtoBufRecordReader();
     Set<String> sourceFields = getSourceFields(getPinotSchema());
-    recordReader.init(_tempFile, sourceFields, getProtoRecordReaderConfig());
+    recordReader.init(file.getAbsoluteFile(), sourceFields, 
getProtoRecordReaderConfig());
     return recordReader;
   }
 
@@ -151,17 +142,21 @@ public class ProtoBufRecordReaderTest extends 
AbstractRecordReaderTest {
       lists.add(sampleRecord);
     }
 
-    _tempFile = getSampleDataPath();
-    try (FileOutputStream output = new FileOutputStream(_tempFile, true)) {
+    _dataFile = getSampleDataPath();
+    try (FileOutputStream output = new FileOutputStream(_dataFile, true)) {
       for (Sample.SampleRecord record : lists) {
         record.writeDelimitedTo(output);
       }
     }
   }
 
-  private File getSampleDataPath()
-      throws IOException {
-    return File.createTempFile(ProtoBufRecordReaderTest.class.getName(), 
PROTO_DATA);
+  @Override
+  protected String getDataFileName() {
+    return PROTO_DATA;
+  }
+
+  private File getSampleDataPath() {
+    return new File(_tempDir, PROTO_DATA);
   }
 
   private ProtoBufRecordReaderConfig getProtoRecordReaderConfig()
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
index 085fbf9414..167a210bcf 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.plugin.inputformat.thrift;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -32,116 +32,79 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 
 /**
  * Test {@code 
org.apache.pinot.plugin.inputformat.thrift.data.ThriftRecordReader} for a given 
sample thrift
  * data.
  */
-public class ThriftRecordReaderTest {
+public class ThriftRecordReaderTest extends AbstractRecordReaderTest {
   private static final String THRIFT_DATA = "_test_sample_thrift_data.thrift";
 
-  private File _tempFile;
+  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
+    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
+    
config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData");
+    return config;
+  }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    FileUtils.deleteQuietly(_tempFile);
-
-    ThriftSampleData t1 = new ThriftSampleData();
-    t1.setActive(true);
-    t1.setCreated_at(1515541280L);
-    t1.setId(1);
-    t1.setName("name1");
-    List<Short> t1Groups = new ArrayList<>(2);
-    t1Groups.add((short) 1);
-    t1Groups.add((short) 4);
-    t1.setGroups(t1Groups);
-    Map<String, Long> mapValues = new HashMap<>();
-    mapValues.put("name1", 1L);
-    t1.setMap_values(mapValues);
-    Set<String> namesSet = new HashSet<>();
-    namesSet.add("name1");
-    t1.setSet_values(namesSet);
-
-    ThriftSampleData t2 = new ThriftSampleData();
-    t2.setActive(false);
-    t2.setCreated_at(1515541290L);
-    t2.setId(2);
-    t2.setName("name2");
-    List<Short> t2Groups = new ArrayList<>(2);
-    t2Groups.add((short) 2);
-    t2Groups.add((short) 3);
-    t2.setGroups(t2Groups);
-    List<ThriftSampleData> lists = new ArrayList<>(2);
-    lists.add(t1);
-    lists.add(t2);
-    TSerializer binarySerializer = new TSerializer(new 
TBinaryProtocol.Factory());
-    _tempFile = getSampleDataPath();
-    FileWriter writer = new FileWriter(_tempFile);
-    for (ThriftSampleData d : lists) {
-      IOUtils.write(binarySerializer.serialize(d), writer);
-    }
-    writer.close();
-  }
-
-  @Test
-  public void testReadData()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), 
getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 2, "The number of rows return is 
incorrect");
-    int id = 1;
-    for (GenericRow outputRow : genericRows) {
-      Assert.assertEquals(outputRow.getValue("id"), id);
-      Assert.assertNull(outputRow.getValue("map_values"));
-      id++;
+    if (_tempDir.exists()) {
+      FileUtils.cleanDirectory(_tempDir);
     }
+    FileUtils.forceMkdir(_tempDir);
+    // Generate Pinot schema
+    _pinotSchema = getPinotSchema();
+    _sourceFields = getSourceFields(_pinotSchema);
+    // Generate random records based on Pinot schema
+    _records = generateRandomRecords(_pinotSchema);
+    _primaryKeys = generatePrimaryKeys(_records, getPrimaryKeyColumns());
+    // Write generated random records to file
+    writeRecordsToFile(_records);
+    // Create and init RecordReader
+    _recordReader = createRecordReader();
   }
 
-  @Test
-  public void testRewind()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), 
getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-
-    recordReader.rewind();
-
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 4, "The number of rows return 
after the rewind is incorrect");
+  protected static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
+    // TODO: instead of hardcoding some rows, change this to work with the 
AbstractRecordReader's random value generator
+    List<Map<String, Object>> records = new ArrayList<>();
+    Map<String, Object> record1 = new HashMap<>();
+    record1.put("active", "true");
+    record1.put("created_at", 1515541280L);
+    record1.put("id", 1);
+    List<Integer> groups1 = new ArrayList<>();
+    groups1.add(1);
+    groups1.add(4);
+    record1.put("groups", groups1);
+    Map<String, Long> mapValues1 = new HashMap<>();
+    mapValues1.put("name1", 1L);
+    record1.put("map_values", mapValues1);
+    List<String> setValues1 = new ArrayList<>();
+    setValues1.add("name1");
+    record1.put("set_values", setValues1);
+    records.add(record1);
+
+    Map<String, Object> record2 = new HashMap<>();
+    record2.put("active", "false");
+    record2.put("created_at", 1515541290L);
+    record2.put("id", 1);
+    List<Integer> groups2 = new ArrayList<>();
+    groups2.add(2);
+    groups2.add(3);
+    record2.put("groups", groups2);
+    records.add(record2);
+
+    return records;
   }
 
-  private File getSampleDataPath()
-      throws IOException {
-    return File.createTempFile(ThriftRecordReaderTest.class.getName(), 
THRIFT_DATA);
-  }
-
-  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
-    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
-    
config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData");
-    return config;
-  }
-
-  private Schema getSchema() {
+  @Override
+  protected org.apache.pinot.spi.data.Schema getPinotSchema() {
     return new Schema.SchemaBuilder().setSchemaName("ThriftSampleData")
         .addSingleValueDimension("id", FieldSpec.DataType.INT)
         .addSingleValueDimension("name", FieldSpec.DataType.STRING)
@@ -155,8 +118,54 @@ public class ThriftRecordReaderTest {
     return Sets.newHashSet("id", "name", "created_at", "active", "groups", 
"set_values");
   }
 
-  @AfterClass
-  public void tearDown() {
-    FileUtils.deleteQuietly(_tempFile);
+  @Override
+  protected RecordReader createRecordReader(File file)
+      throws Exception {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(file, getSourceFields(), getThriftRecordReaderConfig());
+    return recordReader;
+  }
+
+  @Override
+  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+      throws Exception {
+    List<ThriftSampleData> dataList = new ArrayList<>(recordsToWrite.size());
+    for (Map<String, Object> record : recordsToWrite) {
+      ThriftSampleData data = new ThriftSampleData();
+      data.setActive(Boolean.parseBoolean(record.get("active").toString()));
+      data.setCreated_at(Math.abs(((Long) 
record.get("created_at")).longValue()));
+      int i = Math.abs(((Integer) record.get("id")).intValue());
+      data.setId(i);
+      data.setName((String) record.get("name"));
+      List<Integer> groupsList = (List<Integer>) record.get("groups");
+      if (groupsList != null) {
+        List<Short> groupsResult = new ArrayList<>(groupsList.size());
+        for (Integer num : groupsList) {
+          groupsResult.add(num.shortValue());
+        }
+        data.setGroups(groupsResult);
+      }
+      List<String> setValuesList = (List<String>) record.get("set_values");
+      if (setValuesList != null) {
+        Set<String> setValuesResult = new HashSet<>(setValuesList.size());
+        for (String s : setValuesList) {
+          setValuesResult.add(s);
+        }
+        data.setSet_values(setValuesResult);
+      }
+      dataList.add(data);
+    }
+
+    TSerializer binarySerializer = new TSerializer(new 
TBinaryProtocol.Factory());
+    FileWriter writer = new FileWriter(_dataFile);
+    for (ThriftSampleData d : dataList) {
+      IOUtils.write(binarySerializer.serialize(d), writer, 
Charset.defaultCharset());
+    }
+    writer.close();
+  }
+
+  @Override
+  protected String getDataFileName() {
+    return THRIFT_DATA;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
index 90a29dbd5d..f4efdb6d2d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
@@ -184,6 +184,9 @@ public abstract class BaseRecordExtractor<T> implements 
RecordExtractor<T> {
       return bytesValue;
     }
     if (value instanceof Number || value instanceof byte[]) {
+      if (value instanceof Short) {
+        return Integer.valueOf(value.toString());
+      }
       return value;
     }
     return value.toString();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
index fd84c5f306..780eeeb5ce 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
@@ -65,7 +65,13 @@ public class RecordReaderUtils {
       throws IOException {
     if (isGZippedFile(dataFile)) {
       try (final InputStream inputStream = getInputStream(dataFile)) {
-        File targetFile = new File(String.format("%s.%s", 
dataFile.getAbsolutePath(), extension));
+        String targetFileName = dataFile.getName();
+        if (targetFileName.endsWith(".gz")) {
+          targetFileName = targetFileName.substring(0, targetFileName.length() 
- 3);
+        } else {
+          targetFileName = targetFileName + "." + extension;
+        }
+        File targetFile = new File(dataFile.getParentFile(), targetFileName);
         Files.copy(inputStream, targetFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
         return targetFile;
       }
@@ -74,7 +80,7 @@ public class RecordReaderUtils {
     }
   }
 
-  private static boolean isGZippedFile(File file)
+  public static boolean isGZippedFile(File file)
       throws IOException {
     int magic = 0;
     try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index b4821bb54c..de587b3e10 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -20,12 +20,17 @@ package org.apache.pinot.spi.data.readers;
 
 import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.zip.GZIPOutputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -42,13 +47,14 @@ public abstract class AbstractRecordReaderTest {
   protected final static int SAMPLE_RECORDS_SIZE = 10000;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), 
"RecordReaderTest");
+  protected final File _dataFile = new File(_tempDir, getDataFileName());
   protected List<Map<String, Object>> _records;
   protected List<Object[]> _primaryKeys;
   protected org.apache.pinot.spi.data.Schema _pinotSchema;
   protected Set<String> _sourceFields;
-  private RecordReader _recordReader;
+  protected RecordReader _recordReader;
 
-  private static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
+  protected static List<Map<String, Object>> generateRandomRecords(Schema 
pinotSchema) {
     List<Map<String, Object>> records = new ArrayList<>();
 
     for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) {
@@ -99,6 +105,8 @@ public abstract class AbstractRecordReaderTest {
         return RANDOM.nextDouble();
       case STRING:
         return RandomStringUtils.randomAscii(RANDOM.nextInt(50) + 1);
+      case BOOLEAN:
+        return RANDOM.nextBoolean();
       default:
         throw new RuntimeException("Not supported fieldSpec - " + fieldSpec);
     }
@@ -117,9 +125,11 @@ public abstract class AbstractRecordReaderTest {
         } else {
           Object[] actualRecords = (Object[]) 
actualRecord.getValue(fieldSpecName);
           List expectedRecords = (List) expectedRecord.get(fieldSpecName);
-          Assert.assertEquals(actualRecords.length, expectedRecords.size());
-          for (int j = 0; j < actualRecords.length; j++) {
-            Assert.assertEquals(actualRecords[j], expectedRecords.get(j));
+          if (expectedRecords != null) {
+            Assert.assertEquals(actualRecords.length, expectedRecords.size());
+            for (int j = 0; j < actualRecords.length; j++) {
+              Assert.assertEquals(actualRecords[j], expectedRecords.get(j));
+            }
           }
         }
       }
@@ -155,6 +165,14 @@ public abstract class AbstractRecordReaderTest {
     return sourceFields;
   }
 
+  protected File compressFileWithGzip(String sourcePath, String targetPath)
+      throws IOException {
+    try (GZIPOutputStream gos = new GZIPOutputStream(new 
FileOutputStream(Paths.get(targetPath).toFile()))) {
+      Files.copy(Paths.get(sourcePath), gos);
+      return new File(targetPath);
+    }
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -188,17 +206,55 @@ public abstract class AbstractRecordReaderTest {
     checkValue(_recordReader, _records, _primaryKeys);
   }
 
+  @Test
+  public void testGzipRecordReader()
+      throws Exception {
+    // Test Gzipped Avro file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), 
gzipDataFile.getAbsolutePath());
+    RecordReader recordReader = createRecordReader(gzipDataFile);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+
+    // Test Gzipped Avro file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), 
gzipDataFile2.getAbsolutePath());
+    recordReader = createRecordReader(gzipDataFile2);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+  }
+
   /**
-   * @return an implementation of RecordReader
+   * Create the record reader given a file
+   *
+   * @param file input file
+   * @return an implementation of RecordReader of the given file
    * @throws Exception
    */
-  protected abstract RecordReader createRecordReader()
+  protected abstract RecordReader createRecordReader(File file)
       throws Exception;
 
+  /**
+   * @return an implementation of RecordReader
+   * @throws Exception
+   */
+  protected RecordReader createRecordReader()
+      throws Exception {
+    return createRecordReader(_dataFile);
+  }
+
   /**
    * Write records into a file
    * @throws Exception
    */
   protected abstract void writeRecordsToFile(List<Map<String, Object>> 
recordsToWrite)
       throws Exception;
+
+  /**
+   * Get data file name
+   * @throws Exception
+   */
+  protected abstract String getDataFileName();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to