This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2253bd717a Improving gz support for avro record readers (#9951) 2253bd717a is described below commit 2253bd717a10c15220a1157316e233d11531e04b Author: Seunghyun Lee <seungh...@startree.ai> AuthorDate: Sun Dec 11 11:49:22 2022 -0800 Improving gz support for avro record readers (#9951) --- .../pinot/plugin/inputformat/avro/AvroUtils.java | 3 +- .../inputformat/avro/AvroRecordReaderTest.java | 8 +- .../inputformat/csv/CSVRecordReaderTest.java | 11 +- .../json/GzippedJSONRecordReaderTest.java | 54 ------ .../inputformat/json/JSONRecordReaderTest.java | 12 +- .../plugin/inputformat/orc/ORCRecordReader.java | 4 +- .../inputformat/orc/ORCRecordReaderTest.java | 33 +--- .../parquet/ParquetAvroRecordReader.java | 4 +- .../parquet/ParquetNativeRecordReader.java | 4 +- .../inputformat/parquet/ParquetRecordReader.java | 4 +- .../parquet/ParquetNativeRecordReaderTest.java | 10 +- .../parquet/ParquetRecordReaderTest.java | 62 +------ .../protobuf/ProtoBufRecordReaderTest.java | 31 ++-- .../inputformat/thrift/ThriftRecordReaderTest.java | 197 +++++++++++---------- .../spi/data/readers/BaseRecordExtractor.java | 3 + .../pinot/spi/data/readers/RecordReaderUtils.java | 10 +- .../spi/data/readers/AbstractRecordReaderTest.java | 70 +++++++- 17 files changed, 246 insertions(+), 274 deletions(-) diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java index 8c98f57a3f..7fbc087fcb 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java @@ -39,6 +39,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; /** @@ -214,7 +215,7 @@ public class AvroUtils { */ public static DataFileStream<GenericRecord> getAvroReader(File avroFile) throws IOException { - if (avroFile.getName().endsWith(".gz")) { + if (RecordReaderUtils.isGZippedFile(avroFile)) { return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)), new GenericDatumReader<>()); } else { return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<>()); diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java index 3d075a6ee5..9d42e9ce96 100644 --- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java @@ -33,10 +33,9 @@ import org.apache.pinot.spi.data.readers.RecordReader; public class AvroRecordReaderTest extends AbstractRecordReaderTest { - private final File _dataFile = new File(_tempDir, "data.avro"); @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { AvroRecordReader avroRecordReader = new AvroRecordReader(); avroRecordReader.init(_dataFile, _sourceFields, null); @@ -59,4 +58,9 @@ public class AvroRecordReaderTest extends AbstractRecordReaderTest { } } } + + @Override + protected String getDataFileName() { + return "data.avro"; + } } diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java index e4b26d980d..0bbbcef26c 100644 --- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java @@ -38,22 +38,20 @@ import org.testng.annotations.Test; public class CSVRecordReaderTest extends AbstractRecordReaderTest { private static final char CSV_MULTI_VALUE_DELIMITER = '\t'; - private final File _dataFile = new File(_tempDir, "data.csv"); @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig(); csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER); CSVRecordReader csvRecordReader = new CSVRecordReader(); - csvRecordReader.init(_dataFile, _sourceFields, csvRecordReaderConfig); + csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig); return csvRecordReader; } @Override protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) throws Exception { - Schema pinotSchema = getPinotSchema(); String[] columns = pinotSchema.getColumnNames().toArray(new String[0]); try (FileWriter fileWriter = new FileWriter(_dataFile); @@ -73,6 +71,11 @@ public class CSVRecordReaderTest extends AbstractRecordReaderTest { } } + @Override + protected String getDataFileName() { + return "data.csv"; + } + @Override protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap, List<Object[]> expectedPrimaryKeys) diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java deleted file mode 100644 index 997c1c0495..0000000000 --- a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.inputformat.json; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.zip.GZIPOutputStream; -import org.apache.pinot.spi.utils.JsonUtils; - - -/** - * Unit test for {@link JSONRecordReader} for a Gzipped JSON file. - * Relies on the {@link JSONRecordReaderTest} for actual tests, by simply overriding - * the JSON file generation to generate a gzipped JSON file. - */ -public class GzippedJSONRecordReaderTest extends JSONRecordReaderTest { - private final File _dateFile = new File(_tempDir, "data.json"); - - protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) - throws Exception { - try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(_dateFile)), - StandardCharsets.UTF_8)) { - for (Map<String, Object> r : recordsToWrite) { - ObjectNode jsonRecord = JsonUtils.newObjectNode(); - for (String key : r.keySet()) { - jsonRecord.set(key, JsonUtils.objectToJsonNode(r.get(key))); - } - writer.write(jsonRecord.toString()); - } - } - } -} diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java index 81ab49983c..82f83e1779 100644 --- a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java @@ -33,20 +33,19 @@ import org.testng.Assert; public class JSONRecordReaderTest extends AbstractRecordReaderTest { - private final File _dateFile = new File(_tempDir, "data.json"); @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { JSONRecordReader recordReader = new JSONRecordReader(); - recordReader.init(_dateFile, _sourceFields, null); + recordReader.init(file, _sourceFields, null); return recordReader; } @Override protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) throws Exception { - try (FileWriter fileWriter = new FileWriter(_dateFile)) { + try (FileWriter fileWriter = new FileWriter(_dataFile)) { for (Map<String, Object> r : recordsToWrite) { ObjectNode jsonRecord = JsonUtils.newObjectNode(); for (String key : r.keySet()) { @@ -57,6 +56,11 @@ public class JSONRecordReaderTest extends AbstractRecordReaderTest { } } + @Override + protected String getDataFileName() { + return "data.json"; + } + @Override protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap, List<Object[]> expectedPrimaryKeys) diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java index f98ab58921..8a4d3fd709 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java @@ -66,6 +66,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; * </ul> */ public class ORCRecordReader implements RecordReader { + private static final String EXTENSION = "orc"; + private List<String> _orcFields; private List<TypeDescription> _orcFieldTypes; private boolean[] _includeOrcFields; @@ -78,7 +80,7 @@ public class ORCRecordReader implements RecordReader { public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { Configuration configuration = new Configuration(); - File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc"); + File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION); Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()), OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration))); TypeDescription orcSchema = orcReader.getSchema(); diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java index dd9b12a78c..c618b35285 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java @@ -17,15 +17,9 @@ package org.apache.pinot.plugin.inputformat.orc; * specific language governing permissions and limitations * under the License. */ - import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.List; import java.util.Map; -import java.util.zip.GZIPOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -38,26 +32,17 @@ import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; import org.apache.pinot.spi.data.readers.RecordReader; -import org.testng.annotations.Test; import static java.nio.charset.StandardCharsets.UTF_8; public class ORCRecordReaderTest extends AbstractRecordReaderTest { - private final File _dataFile = new File(_tempDir, "data.orc"); - - private void compressGzip(String sourcePath, String targetPath) - throws IOException { - try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) { - Files.copy(Paths.get(sourcePath), gos); - } - } @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { ORCRecordReader orcRecordReader = new ORCRecordReader(); - orcRecordReader.init(_dataFile, _sourceFields, null); + orcRecordReader.init(file, _sourceFields, null); return orcRecordReader; } @@ -158,16 +143,8 @@ public class ORCRecordReaderTest extends AbstractRecordReaderTest { writer.close(); } - @Test - public void testGzipORCRecordReader() - throws Exception { - String gzipFileName = "data.orc.gz"; - compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName)); - final File gzDataFile = new File(_tempDir, gzipFileName); - ORCRecordReader orcRecordReader = new ORCRecordReader(); - orcRecordReader.init(gzDataFile, _sourceFields, null); - checkValue(orcRecordReader, _records, _primaryKeys); - orcRecordReader.rewind(); - checkValue(orcRecordReader, _records, _primaryKeys); + @Override + protected String getDataFileName() { + return "data.orc"; } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java index 787ddc25e7..8caf384630 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java @@ -41,6 +41,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils; * https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a> */ public class ParquetAvroRecordReader implements RecordReader { + private static final String EXTENSION = "parquet"; + private Path _dataFilePath; private AvroRecordExtractor _recordExtractor; private ParquetReader<GenericRecord> _parquetReader; @@ -49,7 +51,7 @@ public class ParquetAvroRecordReader implements RecordReader { @Override public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION); _dataFilePath = new Path(parquetFile.getAbsolutePath()); _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath); _recordExtractor = new AvroRecordExtractor(); diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java index da89c8a382..247bd6c82a 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java @@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils; * Record reader for Native Parquet file. */ public class ParquetNativeRecordReader implements RecordReader { + private static final String EXTENSION = "parquet"; + private Path _dataFilePath; private ParquetNativeRecordExtractor _recordExtractor; private MessageType _schema; @@ -59,7 +61,7 @@ public class ParquetNativeRecordReader implements RecordReader { @Override public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION); _dataFilePath = new Path(parquetFile.getAbsolutePath()); _hadoopConf = ParquetUtils.getParquetHadoopConfiguration(); _recordExtractor = new ParquetNativeRecordExtractor(); diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java index 60886b3b30..0b119baad3 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java @@ -34,13 +34,15 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils; * It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}. */ public class ParquetRecordReader implements RecordReader { + private static final String EXTENSION = "parquet"; + private RecordReader _internalParquetRecordReader; private boolean _useAvroParquetRecordReader = true; @Override public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) throws IOException { - File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet"); + File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION); if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) { _internalParquetRecordReader = new ParquetAvroRecordReader(); } else if (recordReaderConfig != null diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java index 378195d976..58177ee38c 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java @@ -34,13 +34,12 @@ import org.apache.pinot.spi.data.readers.RecordReader; public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest { - private final File _dataFile = new File(_tempDir, "data.parquet"); @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader(); - recordReader.init(_dataFile, _sourceFields, null); + recordReader.init(file, _sourceFields, null); return recordReader; } @@ -63,4 +62,9 @@ public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest { } } } + + @Override + protected String getDataFileName() { + return "data.parquet"; + } } diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java index 345cb1cdbd..78e96203ce 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java @@ -20,14 +20,10 @@ package org.apache.pinot.plugin.inputformat.parquet; import com.google.common.collect.ImmutableSet; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.zip.GZIPOutputStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -43,16 +39,16 @@ import org.testng.annotations.Test; public class ParquetRecordReaderTest extends AbstractRecordReaderTest { - private final File _dataFile = new File(_tempDir, "data.parquet"); - private final String _gzipFileName = "data.parquet.gz"; private final File _testParquetFileWithInt96AndDecimal = new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile()); + private static final int NUM_RECORDS_TEST_PARQUET_WITH_INT96 = 1965; + @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { ParquetRecordReader recordReader = new ParquetRecordReader(); - recordReader.init(_dataFile, _sourceFields, null); + recordReader.init(file, _sourceFields, null); return recordReader; } @@ -76,11 +72,9 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { } } - private void compressGzip(String sourcePath, String targetPath) - throws IOException { - try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) { - Files.copy(Paths.get(sourcePath), gos); - } + @Override + protected String getDataFileName() { + return "data.parquet"; } @Test @@ -122,9 +116,9 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader(); File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile()); - parquetRecordReader.init(nativeParquetFile, null, null); + parquetRecordReader2.init(nativeParquetFile, null, null); // Should be native since file metadata does not have avro schema - Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader()); + Assert.assertFalse(parquetRecordReader2.useAvroParquetRecordReader()); } @Test @@ -168,42 +162,4 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest { Assert.assertEquals(recordsRead, totalRecords, "Message read from ParquetRecordReader doesn't match the expected number."); } - - @Test - public void testGzipParquetRecordReader() - throws IOException { - compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); - final File gzDataFile = new File(_tempDir, _gzipFileName); - ParquetRecordReader recordReader = new ParquetRecordReader(); - recordReader.init(gzDataFile, _sourceFields, null); - testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE); - } - - @Test - public void testGzipParquetAvroRecordReader() - throws IOException { - ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader(); - compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); - final File gzDataFile = new File(_tempDir, _gzipFileName); - avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig()); - testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE); - } - - @Test - public void testGzipParquetNativeRecordReader() - throws IOException { - ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader(); - - final String gzParquetFileWithInt96AndDecimal = - String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath()); - compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal); - final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal); - nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig()); - testReadParquetFile(nativeRecordReader, 1965); - - compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName)); - final File gzDataFile = new File(_tempDir, _gzipFileName); - nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig()); - testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE); - } } diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java index ce9cd1b346..eac590a803 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java @@ -20,7 +20,6 @@ package org.apache.pinot.plugin.inputformat.protobuf; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -35,7 +34,6 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; import org.apache.pinot.spi.data.readers.RecordReader; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -44,7 +42,7 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { private final static Random RANDOM = new Random(System.currentTimeMillis()); private static final String PROTO_DATA = "_test_sample_proto_data.data"; private static final String DESCRIPTOR_FILE = "sample.desc"; - private File _tempFile; + private File _dataFile; private RecordReader _recordReader; private final static int SAMPLE_RECORDS_SIZE = 10000; @@ -57,7 +55,7 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { .addMultiValueDimension("friends", FieldSpec.DataType.STRING).build(); } - private static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { + protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { List<Map<String, Object>> records = new ArrayList<>(); for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) { @@ -115,13 +113,6 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { _recordReader = createRecordReader(); } - @AfterClass - @Override - public void tearDown() - throws Exception { - FileUtils.forceDelete(_tempFile); - } - @Test public void testRecordReader() throws Exception { @@ -131,11 +122,11 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { } @Override - protected RecordReader createRecordReader() + protected RecordReader createRecordReader(File file) throws Exception { RecordReader recordReader = new ProtoBufRecordReader(); Set<String> sourceFields = getSourceFields(getPinotSchema()); - recordReader.init(_tempFile, sourceFields, getProtoRecordReaderConfig()); + recordReader.init(file.getAbsoluteFile(), sourceFields, getProtoRecordReaderConfig()); return recordReader; } @@ -151,17 +142,21 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { lists.add(sampleRecord); } - _tempFile = getSampleDataPath(); - try (FileOutputStream output = new FileOutputStream(_tempFile, true)) { + _dataFile = getSampleDataPath(); + try (FileOutputStream output = new FileOutputStream(_dataFile, true)) { for (Sample.SampleRecord record : lists) { record.writeDelimitedTo(output); } } } - private File getSampleDataPath() - throws IOException { - return File.createTempFile(ProtoBufRecordReaderTest.class.getName(), PROTO_DATA); + @Override + protected String getDataFileName() { + return PROTO_DATA; + } + + private File getSampleDataPath() { + return new File(_tempDir, PROTO_DATA); } private ProtoBufRecordReaderConfig getProtoRecordReaderConfig() diff --git a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java index 085fbf9414..167a210bcf 100644 --- a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java +++ b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java @@ -21,7 +21,7 @@ package org.apache.pinot.plugin.inputformat.thrift; import com.google.common.collect.Sets; import java.io.File; import java.io.FileWriter; -import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -32,116 +32,79 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; +import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; -import org.testng.Assert; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; /** * Test {@code org.apache.pinot.plugin.inputformat.thrift.data.ThriftRecordReader} for a given sample thrift * data. */ -public class ThriftRecordReaderTest { +public class ThriftRecordReaderTest extends AbstractRecordReaderTest { private static final String THRIFT_DATA = "_test_sample_thrift_data.thrift"; - private File _tempFile; + private ThriftRecordReaderConfig getThriftRecordReaderConfig() { + ThriftRecordReaderConfig config = new ThriftRecordReaderConfig(); + config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData"); + return config; + } @BeforeClass public void setUp() throws Exception { - FileUtils.deleteQuietly(_tempFile); - - ThriftSampleData t1 = new ThriftSampleData(); - t1.setActive(true); - t1.setCreated_at(1515541280L); - t1.setId(1); - t1.setName("name1"); - List<Short> t1Groups = new ArrayList<>(2); - t1Groups.add((short) 1); - t1Groups.add((short) 4); - t1.setGroups(t1Groups); - Map<String, Long> mapValues = new HashMap<>(); - mapValues.put("name1", 1L); - t1.setMap_values(mapValues); - Set<String> namesSet = new HashSet<>(); - namesSet.add("name1"); - t1.setSet_values(namesSet); - - ThriftSampleData t2 = new ThriftSampleData(); - t2.setActive(false); - t2.setCreated_at(1515541290L); - t2.setId(2); - t2.setName("name2"); - List<Short> t2Groups = new ArrayList<>(2); - t2Groups.add((short) 2); - t2Groups.add((short) 3); - t2.setGroups(t2Groups); - List<ThriftSampleData> lists = new ArrayList<>(2); - lists.add(t1); - lists.add(t2); - TSerializer binarySerializer = new TSerializer(new TBinaryProtocol.Factory()); - _tempFile = getSampleDataPath(); - FileWriter writer = new FileWriter(_tempFile); - for (ThriftSampleData d : lists) { - IOUtils.write(binarySerializer.serialize(d), writer); - } - writer.close(); - } - - @Test - public void testReadData() - throws IOException { - ThriftRecordReader recordReader = new ThriftRecordReader(); - recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig()); - List<GenericRow> genericRows = new ArrayList<>(); - while (recordReader.hasNext()) { - genericRows.add(recordReader.next()); - } - recordReader.close(); - Assert.assertEquals(genericRows.size(), 2, "The number of rows return is incorrect"); - int id = 1; - for (GenericRow outputRow : genericRows) { - Assert.assertEquals(outputRow.getValue("id"), id); - Assert.assertNull(outputRow.getValue("map_values")); - id++; + if (_tempDir.exists()) { + FileUtils.cleanDirectory(_tempDir); } + FileUtils.forceMkdir(_tempDir); + // Generate Pinot schema + _pinotSchema = getPinotSchema(); + _sourceFields = getSourceFields(_pinotSchema); + // Generate random records based on Pinot schema + _records = generateRandomRecords(_pinotSchema); + _primaryKeys = generatePrimaryKeys(_records, getPrimaryKeyColumns()); + // Write generated random records to file + writeRecordsToFile(_records); + // Create and init RecordReader + _recordReader = createRecordReader(); } - @Test - public void testRewind() - throws IOException { - ThriftRecordReader recordReader = new ThriftRecordReader(); - recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig()); - List<GenericRow> genericRows = new ArrayList<>(); - while (recordReader.hasNext()) { - genericRows.add(recordReader.next()); - } - - recordReader.rewind(); - - while (recordReader.hasNext()) { - genericRows.add(recordReader.next()); - } - recordReader.close(); - Assert.assertEquals(genericRows.size(), 4, "The number of rows return after the rewind is incorrect"); + protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { + // TODO: instead of hardcoding some rows, change this to work with the AbstractRecordReader's random value generator + List<Map<String, Object>> records = new ArrayList<>(); + Map<String, Object> record1 = new HashMap<>(); + record1.put("active", "true"); + record1.put("created_at", 1515541280L); + record1.put("id", 1); + List<Integer> groups1 = new ArrayList<>(); + groups1.add(1); + groups1.add(4); + record1.put("groups", groups1); + Map<String, Long> mapValues1 = new HashMap<>(); + mapValues1.put("name1", 1L); + record1.put("map_values", mapValues1); + List<String> setValues1 = new ArrayList<>(); + setValues1.add("name1"); + record1.put("set_values", setValues1); + records.add(record1); + + Map<String, Object> record2 = new HashMap<>(); + record2.put("active", "false"); + record2.put("created_at", 1515541290L); + record2.put("id", 1); + List<Integer> groups2 = new ArrayList<>(); + groups2.add(2); + groups2.add(3); + record2.put("groups", groups2); + records.add(record2); + + return records; } - private File getSampleDataPath() - throws IOException { - return File.createTempFile(ThriftRecordReaderTest.class.getName(), THRIFT_DATA); - } - - private ThriftRecordReaderConfig getThriftRecordReaderConfig() { - ThriftRecordReaderConfig config = new ThriftRecordReaderConfig(); - config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData"); - return config; - } - - private Schema getSchema() { + @Override + protected org.apache.pinot.spi.data.Schema getPinotSchema() { return new Schema.SchemaBuilder().setSchemaName("ThriftSampleData") .addSingleValueDimension("id", FieldSpec.DataType.INT) .addSingleValueDimension("name", FieldSpec.DataType.STRING) @@ -155,8 +118,54 @@ public class ThriftRecordReaderTest { return Sets.newHashSet("id", "name", "created_at", "active", "groups", "set_values"); } - @AfterClass - public void tearDown() { - FileUtils.deleteQuietly(_tempFile); + @Override + protected RecordReader createRecordReader(File file) + throws Exception { + ThriftRecordReader recordReader = new ThriftRecordReader(); + recordReader.init(file, getSourceFields(), getThriftRecordReaderConfig()); + return recordReader; + } + + @Override + protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) + throws Exception { + List<ThriftSampleData> dataList = new ArrayList<>(recordsToWrite.size()); + for (Map<String, Object> record : recordsToWrite) { + ThriftSampleData data = new ThriftSampleData(); + data.setActive(Boolean.parseBoolean(record.get("active").toString())); + data.setCreated_at(Math.abs(((Long) record.get("created_at")).longValue())); + int i = Math.abs(((Integer) record.get("id")).intValue()); + data.setId(i); + data.setName((String) record.get("name")); + List<Integer> groupsList = (List<Integer>) record.get("groups"); + if (groupsList != null) { + List<Short> groupsResult = new ArrayList<>(groupsList.size()); + for (Integer num : groupsList) { + groupsResult.add(num.shortValue()); + } + data.setGroups(groupsResult); + } + List<String> setValuesList = (List<String>) record.get("set_values"); + if (setValuesList != null) { + Set<String> setValuesResult = new HashSet<>(setValuesList.size()); + for (String s : setValuesList) { + setValuesResult.add(s); + } + data.setSet_values(setValuesResult); + } + dataList.add(data); + } + + TSerializer binarySerializer = new TSerializer(new TBinaryProtocol.Factory()); + FileWriter writer = new FileWriter(_dataFile); + for (ThriftSampleData d : dataList) { + IOUtils.write(binarySerializer.serialize(d), writer, Charset.defaultCharset()); + } + writer.close(); + } + + @Override + protected String getDataFileName() { + return THRIFT_DATA; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java index 90a29dbd5d..f4efdb6d2d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java @@ -184,6 +184,9 @@ public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> { return bytesValue; } if (value instanceof Number || value instanceof byte[]) { + if (value instanceof Short) { + return Integer.valueOf(value.toString()); + } return value; } return value.toString(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java index fd84c5f306..780eeeb5ce 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java @@ -65,7 +65,13 @@ public class RecordReaderUtils { throws IOException { if (isGZippedFile(dataFile)) { try (final InputStream inputStream = getInputStream(dataFile)) { - File targetFile = new File(String.format("%s.%s", dataFile.getAbsolutePath(), extension)); + String targetFileName = dataFile.getName(); + if (targetFileName.endsWith(".gz")) { + targetFileName = targetFileName.substring(0, targetFileName.length() - 3); + } else { + targetFileName = targetFileName + "." + extension; + } + File targetFile = new File(dataFile.getParentFile(), targetFileName); Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); return targetFile; } @@ -74,7 +80,7 @@ public class RecordReaderUtils { } } - private static boolean isGZippedFile(File file) + public static boolean isGZippedFile(File file) throws IOException { int magic = 0; try (RandomAccessFile raf = new RandomAccessFile(file, "r")) { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java index b4821bb54c..de587b3e10 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java @@ -20,12 +20,17 @@ package org.apache.pinot.spi.data.readers; import com.google.common.collect.Sets; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.zip.GZIPOutputStream; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.spi.data.FieldSpec; @@ -42,13 +47,14 @@ public abstract class AbstractRecordReaderTest { protected final static int SAMPLE_RECORDS_SIZE = 10000; protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest"); + protected final File _dataFile = new File(_tempDir, getDataFileName()); protected List<Map<String, Object>> _records; protected List<Object[]> _primaryKeys; protected org.apache.pinot.spi.data.Schema _pinotSchema; protected Set<String> _sourceFields; - private RecordReader _recordReader; + protected RecordReader _recordReader; - private static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { + protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { List<Map<String, Object>> records = new ArrayList<>(); for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) { @@ -99,6 +105,8 @@ public abstract class AbstractRecordReaderTest { return RANDOM.nextDouble(); case STRING: return RandomStringUtils.randomAscii(RANDOM.nextInt(50) + 1); + case BOOLEAN: + return RANDOM.nextBoolean(); default: throw new RuntimeException("Not supported fieldSpec - " + fieldSpec); } @@ -117,9 +125,11 @@ public abstract class AbstractRecordReaderTest { } else { Object[] actualRecords = (Object[]) actualRecord.getValue(fieldSpecName); List expectedRecords = (List) expectedRecord.get(fieldSpecName); - Assert.assertEquals(actualRecords.length, expectedRecords.size()); - for (int j = 0; j < actualRecords.length; j++) { - Assert.assertEquals(actualRecords[j], expectedRecords.get(j)); + if (expectedRecords != null) { + Assert.assertEquals(actualRecords.length, expectedRecords.size()); + for (int j = 0; j < actualRecords.length; j++) { + Assert.assertEquals(actualRecords[j], expectedRecords.get(j)); + } } } } @@ -155,6 +165,14 @@ public abstract class AbstractRecordReaderTest { return sourceFields; } + protected File compressFileWithGzip(String sourcePath, String targetPath) + throws IOException { + try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) { + Files.copy(Paths.get(sourcePath), gos); + return new File(targetPath); + } + } + @BeforeClass public void setUp() throws Exception { @@ -188,17 +206,55 @@ public abstract class AbstractRecordReaderTest { checkValue(_recordReader, _records, _primaryKeys); } + @Test + public void testGzipRecordReader() + throws Exception { + // Test Gzipped Avro file that ends with ".gz" + File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz"); + compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile.getAbsolutePath()); + RecordReader recordReader = createRecordReader(gzipDataFile); + checkValue(recordReader, _records, _primaryKeys); + recordReader.rewind(); + checkValue(recordReader, _records, _primaryKeys); + + // Test Gzipped Avro file that doesn't end with '.gz'. + File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test"); + compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile2.getAbsolutePath()); + recordReader = createRecordReader(gzipDataFile2); + checkValue(recordReader, _records, _primaryKeys); + recordReader.rewind(); + checkValue(recordReader, _records, _primaryKeys); + } + /** - * @return an implementation of RecordReader + * Create the record reader given a file + * + * @param file input file + * @return an implementation of RecordReader of the given file * @throws Exception */ - protected abstract RecordReader createRecordReader() + protected abstract RecordReader createRecordReader(File file) throws Exception; + /** + * @return an implementation of RecordReader + * @throws Exception + */ + protected RecordReader createRecordReader() + throws Exception { + return createRecordReader(_dataFile); + } + /** * Write records into a file * @throws Exception */ protected abstract void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) throws Exception; + + /** + * Get data file name + * @throws Exception + */ + protected abstract String getDataFileName(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org