This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new b69453e Adding support for Protobuf input format (#5293) b69453e is described below commit b69453e272b09314071183fc40a6516d64b0a412 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed May 27 23:50:14 2020 +0530 Adding support for Protobuf input format (#5293) Authored-by: Kartik Khare <kartikkhare@Kartiks-MacBook-Pro.local> --- .../v0_deprecated/pinot-hadoop/pom.xml | 11 + .../hadoop/job/mappers/SegmentCreationMapper.java | 10 + .../v0_deprecated/pinot-spark/pom.xml | 12 +- .../spark/jobs/SparkSegmentCreationFunction.java | 10 + .../{ => pinot-protobuf}/pom.xml | 45 +- .../protobuf/ProtoBufRecordExtractor.java | 49 + .../inputformat/protobuf/ProtoBufRecordReader.java | 137 +++ .../protobuf/ProtoBufRecordReaderConfig.java | 18 +- .../protobuf/ProtoBufRecordReaderTest.java | 173 ++++ .../pinot/plugin/inputformat/protobuf/Sample.java | 1075 ++++++++++++++++++++ .../pinot-protobuf/src/test/resources/sample.desc | 8 + .../pinot-protobuf/src/test/resources/sample.proto | 9 + pinot-plugins/pinot-input-format/pom.xml | 1 + .../apache/pinot/spi/data/readers/FileFormat.java | 2 +- .../org/apache/pinot/spi/utils/ResourceFinder.java | 78 ++ 15 files changed, 1600 insertions(+), 38 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml index 6b7450a..1fd7033 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/pom.xml @@ -147,6 +147,17 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-protobuf</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </dependency> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index 47242e5..52c3393 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -44,6 +44,7 @@ import org.apache.pinot.core.segment.name.SegmentNameGenerator; import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; +import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig; import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -330,6 +331,15 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab return readerConfig; } } + + if (fileFormat == FileFormat.PROTO) { + try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) { + ProtoBufRecordReaderConfig readerConfig = + JsonUtils.inputStreamToObject(inputStream, ProtoBufRecordReaderConfig.class); + _logger.info("Using Protocol Buffer record reader config: {}", readerConfig); + return readerConfig; + } + } } return null; } diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml index 964a585..e8850b2 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/pom.xml @@ -160,7 +160,17 @@ </exclusion> </exclusions> </dependency> - + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-protobuf</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- Spark --> <dependency> <groupId>org.apache.spark</groupId> diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java index 80d7434..b234e3e 100644 --- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java +++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java @@ -41,6 +41,7 @@ import org.apache.pinot.core.segment.name.SegmentNameGenerator; import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator; import org.apache.pinot.ingestion.common.JobConfigConstants; import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig; +import org.apache.pinot.plugin.inputformat.protobuf.ProtoBufRecordReaderConfig; import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -297,6 +298,15 @@ public class SparkSegmentCreationFunction implements Serializable { return readerConfig; } } + + if (fileFormat == FileFormat.PROTO) { + try (InputStream inputStream = FileSystem.get(_readerConfigFile.toUri(), _jobConf).open(_readerConfigFile)) { + ProtoBufRecordReaderConfig readerConfig = + JsonUtils.inputStreamToObject(inputStream, ProtoBufRecordReaderConfig.class); + _logger.info("Using Protocol Buffer record reader config: {}", readerConfig); + return readerConfig; + } + } } return null; } diff --git a/pinot-plugins/pinot-input-format/pom.xml b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml similarity index 55% copy from pinot-plugins/pinot-input-format/pom.xml copy to pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml index bf7dd9c..d018a89 100644 --- a/pinot-plugins/pinot-input-format/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml @@ -19,54 +19,33 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>pinot-plugins</artifactId> + <artifactId>pinot-input-format</artifactId> <groupId>org.apache.pinot</groupId> <version>${revision}${sha1}</version> <relativePath>..</relativePath> </parent> - <artifactId>pinot-input-format</artifactId> - <packaging>pom</packaging> - <name>Pinot Input Format </name> + + <artifactId>pinot-protobuf</artifactId> + <name>Pinot Protocol Buffers</name> <url>https://pinot.apache.org/</url> <properties> - <pinot.root>${basedir}/../..</pinot.root> - <plugin.type>pinot-input-format</plugin.type> + <pinot.root>${basedir}/../../..</pinot.root> + <proto.version>3.11.4</proto.version> </properties> - - <modules> - <module>pinot-avro</module> - <module>pinot-avro-base</module> - <module>pinot-confluent-avro</module> - <module>pinot-orc</module> - <module>pinot-json</module> - <module>pinot-parquet</module> - <module>pinot-csv</module> - <module>pinot-thrift</module> - </modules> - <dependencies> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <!-- test --> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <scope>test</scope> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> </dependency> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-spi</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${proto.version}</version> </dependency> </dependencies> </project> diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java new file mode 100644 index 0000000..6978fe7 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordExtractor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import java.util.Set; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordExtractor; +import org.apache.pinot.spi.data.readers.RecordExtractorConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; + + +public class ProtoBufRecordExtractor implements RecordExtractor<Message> { + + private Set<String> _fields; + + @Override + public void init(Set<String> fields, RecordExtractorConfig recordExtractorConfig) { + _fields = fields; + } + + @Override + public GenericRow extract(Message from, GenericRow to) { + for (String fieldName : _fields) { + Descriptors.FieldDescriptor fieldDescriptor = from.getDescriptorForType().findFieldByName(fieldName); + Object value = from.getField(fieldDescriptor); + Object convertedValue = RecordReaderUtils.convert(value); + to.putValue(fieldName, convertedValue); + } + return to; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java new file mode 100644 index 0000000..2da8629 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.data.readers.RecordReaderUtils; +import org.apache.pinot.spi.utils.ResourceFinder; + + +public class ProtoBufRecordReader implements RecordReader { + private File _dataFile; + private ProtoBufRecordExtractor _recordExtractor; + + private InputStream _inputStream; + private boolean _hasNext; + private DynamicMessage _dynamicMessage; + + private boolean hasMoreToRead() + throws IOException { + _inputStream.mark(1); + int nextByte = _inputStream.read(); + _inputStream.reset(); + return nextByte != -1; + } + + private void init() + throws IOException { + _inputStream = RecordReaderUtils.getBufferedInputStream(_dataFile); + try { + _hasNext = hasMoreToRead(); + } catch (Exception e) { + _inputStream.close(); + throw e; + } + } + + @Override + public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) + throws IOException { + _dataFile = dataFile; + ProtoBufRecordReaderConfig protoBufRecordReaderConfig = (ProtoBufRecordReaderConfig) recordReaderConfig; + InputStream fin = getDescriptorFileInputStream(protoBufRecordReaderConfig); + Descriptors.Descriptor descriptor = buildProtoBufDescriptor(fin); + _recordExtractor = new ProtoBufRecordExtractor(); + _recordExtractor.init(fieldsToRead, null); + _dynamicMessage = DynamicMessage.getDefaultInstance(descriptor); + init(); + } + + private Descriptors.Descriptor buildProtoBufDescriptor(InputStream fin) + throws IOException { + try { + DescriptorProtos.FileDescriptorSet set = DescriptorProtos.FileDescriptorSet.parseFrom(fin); + Descriptors.FileDescriptor fileDescriptor = + Descriptors.FileDescriptor.buildFrom(set.getFile(0), new Descriptors.FileDescriptor[]{}); + return fileDescriptor.getMessageTypes().get(0); + } catch (Descriptors.DescriptorValidationException e) { + throw new IOException("Descriptor file validation failed", e); + } + } + + private InputStream getDescriptorFileInputStream(ProtoBufRecordReaderConfig protoBufRecordReaderConfig) + throws IOException { + URI descriptorFileURI = protoBufRecordReaderConfig.getDescriptorFile(); + return ResourceFinder.openResource(descriptorFileURI); + } + + @Override + public boolean hasNext() { + return _hasNext; + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + Message message = null; + try { + Message.Builder builder = _dynamicMessage.newBuilderForType(); + builder.mergeDelimitedFrom(_inputStream); + message = builder.build(); + } catch (Exception e) { + throw new IOException("Caught exception while reading protobuf object", e); + } + _recordExtractor.extract(message, reuse); + _hasNext = hasMoreToRead(); + return reuse; + } + + @Override + public void rewind() + throws IOException { + _inputStream.close(); + init(); + } + + @Override + public void close() + throws IOException { + _inputStream.close(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java similarity index 66% copy from pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java copy to pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java index 532fd16..8b3eaec 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderConfig.java @@ -16,8 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.spi.data.readers; +package org.apache.pinot.plugin.inputformat.protobuf; -public enum FileFormat { - AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, OTHER +import java.net.URI; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; + + +public class ProtoBufRecordReaderConfig implements RecordReaderConfig { + private URI _descriptorFile; + + public URI getDescriptorFile() { + return _descriptorFile; + } + + public void setDescriptorFile(URI descriptorFile) { + _descriptorFile = descriptorFile; + } } diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java new file mode 100644 index 0000000..ad30391 --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.inputformat.protobuf; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest { + private final static Random RANDOM = new Random(System.currentTimeMillis()); + private static final String PROTO_DATA = "_test_sample_proto_data.data"; + private static final String DESCRIPTOR_FILE = "sample.desc"; + private File _tempFile; + private RecordReader _recordReader; + private final static int SAMPLE_RECORDS_SIZE = 10000; + + @Override + protected Schema getPinotSchema() { + return new Schema.SchemaBuilder().setSchemaName("SampleRecord") + .addSingleValueDimension("id", FieldSpec.DataType.INT) + .addSingleValueDimension("name", FieldSpec.DataType.STRING) + .addSingleValueDimension("email", FieldSpec.DataType.STRING) + .addMultiValueDimension("friends", FieldSpec.DataType.STRING).build(); + } + + private static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) { + List<Map<String, Object>> records = new ArrayList<>(); + + for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) { + Map<String, Object> recordMap = new HashMap<>(); + for (FieldSpec fieldSpec : pinotSchema.getAllFieldSpecs()) { + recordMap.put(fieldSpec.getName(), generateRandomValue(fieldSpec)); + } + records.add(recordMap); + } + return records; + } + + private static Object generateRandomValue(FieldSpec fieldSpec) { + if (fieldSpec.isSingleValueField()) { + return generateRandomSingleValue(fieldSpec); + } + List list = new ArrayList(); + int listSize = 1 + RANDOM.nextInt(50); + for (int i = 0; i < listSize; i++) { + list.add(generateRandomSingleValue(fieldSpec)); + } + return list; + } + + private static Object generateRandomSingleValue(FieldSpec fieldSpec) { + switch (fieldSpec.getDataType()) { + case INT: + return RANDOM.nextInt(); + case LONG: + return RANDOM.nextLong(); + case FLOAT: + return RANDOM.nextFloat(); + case DOUBLE: + return RANDOM.nextDouble(); + case STRING: + return RandomStringUtils.randomAscii(RANDOM.nextInt(50) + 1); + default: + throw new RuntimeException("Not supported fieldSpec - " + fieldSpec); + } + } + + @BeforeClass + @Override + public void setUp() + throws Exception { + FileUtils.forceMkdir(_tempDir); + // Generate Pinot schema + _pinotSchema = getPinotSchema(); + // Generate random records based on Pinot schema + _records = generateRandomRecords(_pinotSchema); + // Write generated random records to file + writeRecordsToFile(_records); + // Create and init RecordReader + _recordReader = createRecordReader(); + } + + @AfterClass + @Override + public void tearDown() + throws Exception { + FileUtils.forceDelete(_tempFile); + } + + @Test + public void testRecordReader() + throws Exception { + checkValue(_recordReader, _records); + _recordReader.rewind(); + checkValue(_recordReader, _records); + } + + @Override + protected RecordReader createRecordReader() + throws Exception { + RecordReader recordReader = new ProtoBufRecordReader(); + Set<String> sourceFields = getSourceFields(getPinotSchema()); + recordReader.init(_tempFile, sourceFields, getProtoRecordReaderConfig()); + return recordReader; + } + + @Override + protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite) + throws Exception { + List<Sample.SampleRecord> lists = new ArrayList<>(); + for (Map<String, Object> record : recordsToWrite) { + Sample.SampleRecord sampleRecord = + Sample.SampleRecord.newBuilder().setEmail((String) record.get("email")).setName((String) record.get("name")) + .setId((Integer) record.get("id")).addAllFriends((List<String>) record.get("friends")).build(); + + lists.add(sampleRecord); + } + + _tempFile = getSampleDataPath(); + try (FileOutputStream output = new FileOutputStream(_tempFile, true)) { + for (Sample.SampleRecord record : lists) { + record.writeDelimitedTo(output); + } + } + } + + private File getSampleDataPath() + throws IOException { + return File.createTempFile(ProtoBufRecordReaderTest.class.getName(), PROTO_DATA); + } + + private ProtoBufRecordReaderConfig getProtoRecordReaderConfig() + throws URISyntaxException { + ProtoBufRecordReaderConfig config = new ProtoBufRecordReaderConfig(); + URL descriptorFile = getClass().getClassLoader().getResource(DESCRIPTOR_FILE); + config.setDescriptorFile(descriptorFile.toURI()); + return config; + } +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java new file mode 100644 index 0000000..86fdb5d --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/Sample.java @@ -0,0 +1,1075 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: sample.proto + +package org.apache.pinot.plugin.inputformat.protobuf; + +public final class Sample { + private Sample() { + } + + public static void registerAllExtensions(com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions((com.google.protobuf.ExtensionRegistryLite) registry); + } + + public interface SampleRecordOrBuilder extends + // @@protoc_insertion_point(interface_extends:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord) + com.google.protobuf.MessageOrBuilder { + + /** + * <code>string name = 1;</code> + * @return The name. + */ + java.lang.String getName(); + + /** + * <code>string name = 1;</code> + * @return The bytes for name. + */ + com.google.protobuf.ByteString getNameBytes(); + + /** + * <code>int32 id = 2;</code> + * @return The id. + */ + int getId(); + + /** + * <code>string email = 3;</code> + * @return The email. + */ + java.lang.String getEmail(); + + /** + * <code>string email = 3;</code> + * @return The bytes for email. + */ + com.google.protobuf.ByteString getEmailBytes(); + + /** + * <code>repeated string friends = 4;</code> + * @return A list containing the friends. + */ + java.util.List<java.lang.String> getFriendsList(); + + /** + * <code>repeated string friends = 4;</code> + * @return The count of friends. + */ + int getFriendsCount(); + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the element to return. + * @return The friends at the given index. + */ + java.lang.String getFriends(int index); + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the value to return. + * @return The bytes of the friends at the given index. + */ + com.google.protobuf.ByteString getFriendsBytes(int index); + } + + /** + * Protobuf type {@code org.apache.pinot.plugin.inputformat.protobuf.SampleRecord} + */ + public static final class SampleRecord extends com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord) + SampleRecordOrBuilder { + private static final long serialVersionUID = 0L; + + // Use SampleRecord.newBuilder() to construct. + private SampleRecord(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) { + super(builder); + } + + private SampleRecord() { + name_ = ""; + email_ = ""; + friends_ = com.google.protobuf.LazyStringArrayList.EMPTY; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance(UnusedPrivateParameter unused) { + return new SampleRecord(); + } + + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet getUnknownFields() { + return this.unknownFields; + } + + private SampleRecord(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + java.lang.String s = input.readStringRequireUtf8(); + + name_ = s; + break; + } + case 16: { + + id_ = input.readInt32(); + break; + } + case 26: { + java.lang.String s = input.readStringRequireUtf8(); + + email_ = s; + break; + } + case 34: { + java.lang.String s = input.readStringRequireUtf8(); + if (!((mutable_bitField0_ & 0x00000001) != 0)) { + friends_ = new com.google.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + friends_.add(s); + break; + } + default: { + if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException(e).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) != 0)) { + friends_ = friends_.getUnmodifiableView(); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + + public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable + .ensureFieldAccessorsInitialized(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.class, + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.Builder.class); + } + + public static final int NAME_FIELD_NUMBER = 1; + private volatile java.lang.Object name_; + + /** + * <code>string name = 1;</code> + * @return The name. + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } + } + + /** + * <code>string name = 1;</code> + * @return The bytes for name. + */ + public com.google.protobuf.ByteString getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int ID_FIELD_NUMBER = 2; + private int id_; + + /** + * <code>int32 id = 2;</code> + * @return The id. + */ + public int getId() { + return id_; + } + + public static final int EMAIL_FIELD_NUMBER = 3; + private volatile java.lang.Object email_; + + /** + * <code>string email = 3;</code> + * @return The email. + */ + public java.lang.String getEmail() { + java.lang.Object ref = email_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + email_ = s; + return s; + } + } + + /** + * <code>string email = 3;</code> + * @return The bytes for email. + */ + public com.google.protobuf.ByteString getEmailBytes() { + java.lang.Object ref = email_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + email_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int FRIENDS_FIELD_NUMBER = 4; + private com.google.protobuf.LazyStringList friends_; + + /** + * <code>repeated string friends = 4;</code> + * @return A list containing the friends. + */ + public com.google.protobuf.ProtocolStringList getFriendsList() { + return friends_; + } + + /** + * <code>repeated string friends = 4;</code> + * @return The count of friends. + */ + public int getFriendsCount() { + return friends_.size(); + } + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the element to return. + * @return The friends at the given index. + */ + public java.lang.String getFriends(int index) { + return friends_.get(index); + } + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the value to return. + * @return The bytes of the friends at the given index. + */ + public com.google.protobuf.ByteString getFriendsBytes(int index) { + return friends_.getByteString(index); + } + + private byte memoizedIsInitialized = -1; + + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) { + return true; + } + if (isInitialized == 0) { + return false; + } + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (!getNameBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 1, name_); + } + if (id_ != 0) { + output.writeInt32(2, id_); + } + if (!getEmailBytes().isEmpty()) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 3, email_); + } + for (int i = 0; i < friends_.size(); i++) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 4, friends_.getRaw(i)); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) { + return size; + } + + size = 0; + if (!getNameBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, name_); + } + if (id_ != 0) { + size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, id_); + } + if (!getEmailBytes().isEmpty()) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, email_); + } + { + int dataSize = 0; + for (int i = 0; i < friends_.size(); i++) { + dataSize += computeStringSizeNoTag(friends_.getRaw(i)); + } + size += dataSize; + size += 1 * getFriendsList().size(); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord)) { + return super.equals(obj); + } + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord other = + (org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) obj; + + if (!getName().equals(other.getName())) { + return false; + } + if (getId() != other.getId()) { + return false; + } + if (!getEmail().equals(other.getEmail())) { + return false; + } + if (!getFriendsList().equals(other.getFriendsList())) { + return false; + } + if (!unknownFields.equals(other.unknownFields)) { + return false; + } + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + hash = (37 * hash) + NAME_FIELD_NUMBER; + hash = (53 * hash) + getName().hashCode(); + hash = (37 * hash) + ID_FIELD_NUMBER; + hash = (53 * hash) + getId(); + hash = (37 * hash) + EMAIL_FIELD_NUMBER; + hash = (53 * hash) + getEmail().hashCode(); + if (getFriendsCount() > 0) { + hash = (37 * hash) + FRIENDS_FIELD_NUMBER; + hash = (53 * hash) + getFriendsList().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom( + com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom(java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseDelimitedFrom( + java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseDelimitedFrom( + java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parseFrom( + com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3.parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { + return newBuilder(); + } + + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + + public static Builder newBuilder(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType(com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + + /** + * Protobuf type {@code org.apache.pinot.plugin.inputformat.protobuf.SampleRecord} + */ + public static final class Builder extends com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements + // @@protoc_insertion_point(builder_implements:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord) + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecordOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internalGetFieldAccessorTable() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable + .ensureFieldAccessorsInitialized(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.class, + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.Builder.class); + } + + // Construct using org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3.alwaysUseFieldBuilders) { + } + } + + @java.lang.Override + public Builder clear() { + super.clear(); + name_ = ""; + + id_ = 0; + + email_ = ""; + + friends_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor; + } + + @java.lang.Override + public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord getDefaultInstanceForType() { + return org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.getDefaultInstance(); + } + + @java.lang.Override + public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord build() { + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord buildPartial() { + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord result = + new org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord(this); + int from_bitField0_ = bitField0_; + result.name_ = name_; + result.id_ = id_; + result.email_ = email_; + if (((bitField0_ & 0x00000001) != 0)) { + friends_ = friends_.getUnmodifiableView(); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.friends_ = friends_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + + @java.lang.Override + public Builder setField(com.google.protobuf.Descriptors.FieldDescriptor field, java.lang.Object value) { + return super.setField(field, value); + } + + @java.lang.Override + public Builder clearField(com.google.protobuf.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + + @java.lang.Override + public Builder clearOneof(com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + + @java.lang.Override + public Builder setRepeatedField(com.google.protobuf.Descriptors.FieldDescriptor field, int index, + java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + + @java.lang.Override + public Builder addRepeatedField(com.google.protobuf.Descriptors.FieldDescriptor field, java.lang.Object value) { + return super.addRepeatedField(field, value); + } + + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) { + return mergeFrom((org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord other) { + if (other == org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord.getDefaultInstance()) { + return this; + } + if (!other.getName().isEmpty()) { + name_ = other.name_; + onChanged(); + } + if (other.getId() != 0) { + setId(other.getId()); + } + if (!other.getEmail().isEmpty()) { + email_ = other.email_; + onChanged(); + } + if (!other.friends_.isEmpty()) { + if (friends_.isEmpty()) { + friends_ = other.friends_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureFriendsIsMutable(); + friends_.addAll(other.friends_); + } + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + + private int bitField0_; + + private java.lang.Object name_ = ""; + + /** + * <code>string name = 1;</code> + * @return The name. + */ + public java.lang.String getName() { + java.lang.Object ref = name_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + name_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + + /** + * <code>string name = 1;</code> + * @return The bytes for name. + */ + public com.google.protobuf.ByteString getNameBytes() { + java.lang.Object ref = name_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + name_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + /** + * <code>string name = 1;</code> + * @param value The name to set. + * @return This builder for chaining. + */ + public Builder setName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + name_ = value; + onChanged(); + return this; + } + + /** + * <code>string name = 1;</code> + * @return This builder for chaining. + */ + public Builder clearName() { + + name_ = getDefaultInstance().getName(); + onChanged(); + return this; + } + + /** + * <code>string name = 1;</code> + * @param value The bytes for name to set. + * @return This builder for chaining. + */ + public Builder setNameBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + name_ = value; + onChanged(); + return this; + } + + private int id_; + + /** + * <code>int32 id = 2;</code> + * @return The id. + */ + public int getId() { + return id_; + } + + /** + * <code>int32 id = 2;</code> + * @param value The id to set. + * @return This builder for chaining. + */ + public Builder setId(int value) { + + id_ = value; + onChanged(); + return this; + } + + /** + * <code>int32 id = 2;</code> + * @return This builder for chaining. + */ + public Builder clearId() { + + id_ = 0; + onChanged(); + return this; + } + + private java.lang.Object email_ = ""; + + /** + * <code>string email = 3;</code> + * @return The email. + */ + public java.lang.String getEmail() { + java.lang.Object ref = email_; + if (!(ref instanceof java.lang.String)) { + com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + email_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + + /** + * <code>string email = 3;</code> + * @return The bytes for email. + */ + public com.google.protobuf.ByteString getEmailBytes() { + java.lang.Object ref = email_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((java.lang.String) ref); + email_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + /** + * <code>string email = 3;</code> + * @param value The email to set. + * @return This builder for chaining. + */ + public Builder setEmail(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + + email_ = value; + onChanged(); + return this; + } + + /** + * <code>string email = 3;</code> + * @return This builder for chaining. + */ + public Builder clearEmail() { + + email_ = getDefaultInstance().getEmail(); + onChanged(); + return this; + } + + /** + * <code>string email = 3;</code> + * @param value The bytes for email to set. + * @return This builder for chaining. + */ + public Builder setEmailBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + + email_ = value; + onChanged(); + return this; + } + + private com.google.protobuf.LazyStringList friends_ = com.google.protobuf.LazyStringArrayList.EMPTY; + + private void ensureFriendsIsMutable() { + if (!((bitField0_ & 0x00000001) != 0)) { + friends_ = new com.google.protobuf.LazyStringArrayList(friends_); + bitField0_ |= 0x00000001; + } + } + + /** + * <code>repeated string friends = 4;</code> + * @return A list containing the friends. + */ + public com.google.protobuf.ProtocolStringList getFriendsList() { + return friends_.getUnmodifiableView(); + } + + /** + * <code>repeated string friends = 4;</code> + * @return The count of friends. + */ + public int getFriendsCount() { + return friends_.size(); + } + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the element to return. + * @return The friends at the given index. + */ + public java.lang.String getFriends(int index) { + return friends_.get(index); + } + + /** + * <code>repeated string friends = 4;</code> + * @param index The index of the value to return. + * @return The bytes of the friends at the given index. + */ + public com.google.protobuf.ByteString getFriendsBytes(int index) { + return friends_.getByteString(index); + } + + /** + * <code>repeated string friends = 4;</code> + * @param index The index to set the value at. + * @param value The friends to set. + * @return This builder for chaining. + */ + public Builder setFriends(int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureFriendsIsMutable(); + friends_.set(index, value); + onChanged(); + return this; + } + + /** + * <code>repeated string friends = 4;</code> + * @param value The friends to add. + * @return This builder for chaining. + */ + public Builder addFriends(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureFriendsIsMutable(); + friends_.add(value); + onChanged(); + return this; + } + + /** + * <code>repeated string friends = 4;</code> + * @param values The friends to add. + * @return This builder for chaining. + */ + public Builder addAllFriends(java.lang.Iterable<java.lang.String> values) { + ensureFriendsIsMutable(); + com.google.protobuf.AbstractMessageLite.Builder.addAll(values, friends_); + onChanged(); + return this; + } + + /** + * <code>repeated string friends = 4;</code> + * @return This builder for chaining. + */ + public Builder clearFriends() { + friends_ = com.google.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + + /** + * <code>repeated string friends = 4;</code> + * @param value The bytes of the friends to add. + * @return This builder for chaining. + */ + public Builder addFriendsBytes(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + checkByteStringIsUtf8(value); + ensureFriendsIsMutable(); + friends_.add(value); + onChanged(); + return this; + } + + @java.lang.Override + public final Builder setUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields(final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + // @@protoc_insertion_point(builder_scope:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord) + } + + // @@protoc_insertion_point(class_scope:org.apache.pinot.plugin.inputformat.protobuf.SampleRecord) + private static final org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = new org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord(); + } + + public static org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final com.google.protobuf.Parser<SampleRecord> PARSER = + new com.google.protobuf.AbstractParser<SampleRecord>() { + @java.lang.Override + public SampleRecord parsePartialFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new SampleRecord(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser<SampleRecord> parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser<SampleRecord> getParserForType() { + return PARSER; + } + + @java.lang.Override + public org.apache.pinot.plugin.inputformat.protobuf.Sample.SampleRecord getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor; + private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { + return descriptor; + } + + private static com.google.protobuf.Descriptors.FileDescriptor descriptor; + + static { + java.lang.String[] descriptorData = + {"\n\014sample.proto\022,org.apache.pinot.plugin." + "inputformat.protobuf\"H\n\014SampleRecord\022\014\n\004" + + "name\030\001 \001(\t\022\n\n\002id\030\002 \001(\005\022\r\n\005email\030\003 \001(\t\022\017\n" + + "\007friends\030\004 \003(\tb\006proto3"}; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[]{}); + internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_fieldAccessorTable = + new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_org_apache_pinot_plugin_inputformat_protobuf_SampleRecord_descriptor, + new java.lang.String[]{"Name", "Id", "Email", "Friends",}); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc new file mode 100644 index 0000000..b863b5a --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.desc @@ -0,0 +1,8 @@ + +� +sample.proto,org.apache.pinot.plugin.inputformat.protobuf"b +SampleRecord +name ( Rname +id (Rid +email ( Remail +friends ( Rfriendsbproto3 \ No newline at end of file diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto new file mode 100644 index 0000000..e27377b --- /dev/null +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/resources/sample.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; +package org.apache.pinot.plugin.inputformat.protobuf; + +message SampleRecord { + string name = 1; + int32 id = 2; + string email = 3; + repeated string friends = 4; +} diff --git a/pinot-plugins/pinot-input-format/pom.xml b/pinot-plugins/pinot-input-format/pom.xml index bf7dd9c..629f396 100644 --- a/pinot-plugins/pinot-input-format/pom.xml +++ b/pinot-plugins/pinot-input-format/pom.xml @@ -48,6 +48,7 @@ <module>pinot-parquet</module> <module>pinot-csv</module> <module>pinot-thrift</module> + <module>pinot-protobuf</module> </modules> <dependencies> diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java index 532fd16..6e62b29 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/FileFormat.java @@ -19,5 +19,5 @@ package org.apache.pinot.spi.data.readers; public enum FileFormat { - AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, OTHER + AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, ORC, PROTO, OTHER } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java new file mode 100644 index 0000000..7c5644e --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/ResourceFinder.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.utils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; + + +/** + * Utility class containing helper method for accessing a particular resource + */ +public class ResourceFinder { + + /** + * Access a resource for a particular URI + * + * @param uri of the resource + * @return InputStream containing file contents + * @throws IOException + */ + public static InputStream openResource(URI uri) + throws IOException { + File file = new File(uri); + return new FileInputStream(file); + } + + /** + * Access a resource on a FilePath + * + * @param classLoader ClassPath for the resource + * @param pathName Absolute or Relative Path of the resource + * @return InputStream containing file contents + * @throws IOException + */ + public static InputStream openResource(ClassLoader classLoader, String pathName) + throws IOException { + Path path = Paths.get(pathName); + if (path.isAbsolute()) { + return new FileInputStream(pathName); + } else { + return openResourceWithRelativePath(classLoader, pathName); + } + } + + /** + * Access a resource on a Relative FilePath + * + * @param classLoader ClassPath for the resource + * @param pathName Relative Path of the resource + * @return InputStream containing file contents + * @throws IOException + */ + public static InputStream openResourceWithRelativePath(ClassLoader classLoader, String pathName) + throws IOException { + return classLoader.getResourceAsStream(pathName); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org