stevenzwu commented on code in PR #14040: URL: https://github.com/apache/iceberg/pull/14040#discussion_r2356971417
########## core/src/test/java/org/apache/iceberg/TestInternalData.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestInternalData { + + @Parameter(index = 0) + private FileFormat format; + + @Parameters(name = " format = {0}") + protected static List<Object> parameters() { + return Arrays.asList(new Object[] {FileFormat.AVRO}, new Object[] {FileFormat.PARQUET}); + } + + private static final Schema SIMPLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + private static final Schema NESTED_SCHEMA = + new Schema( + Types.NestedField.required(1, "outer_id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "nested_struct", + Types.StructType.of( + Types.NestedField.optional(3, "inner_id", Types.LongType.get()), + Types.NestedField.optional(4, "inner_name", Types.StringType.get())))); + + @TempDir private Path tempDir; + + private final FileIO fileIO = new TestTables.LocalFileIO(); + + @TestTemplate + public void testCustomRootType() throws IOException { + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List<Record> testData = RandomInternalData.generate(SIMPLE_SCHEMA, 1000, 1L); + + try (FileAppender<Record> appender = + InternalData.write(format, outputFile).schema(SIMPLE_SCHEMA).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List<PartitionData> readRecords = Lists.newArrayList(); + + try (CloseableIterable<PartitionData> reader = + InternalData.read(format, inputFile) + .project(SIMPLE_SCHEMA) + .setRootType(PartitionData.class) + .build()) { + for (PartitionData record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSize(testData.size()); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + PartitionData actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + assertThat(actual.get(1, String.class)).isEqualTo(expected.get(1, String.class)); + } + } + + @TestTemplate + public void testCustomTypeForNestedField() throws IOException { + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List<Record> testData = RandomInternalData.generate(NESTED_SCHEMA, 1000, 1L); + + try (FileAppender<Record> appender = + InternalData.write(format, outputFile).schema(NESTED_SCHEMA).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List<Record> readRecords = Lists.newArrayList(); + + try (CloseableIterable<Record> reader = + InternalData.read(format, inputFile) + .project(NESTED_SCHEMA) + .setCustomType(2, TestHelpers.CustomRow.class) + .build()) { + for (Record record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSize(testData.size()); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + Record actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + + Object expectedNested = expected.get(1); + Object actualNested = actual.get(1); + + if (expectedNested == null && actualNested == null) { + continue; + } + + if (actualNested != null) { + assertThat(actualNested) + .as("Custom type should be TestCustomRow, but was: " + actualNested.getClass()) Review Comment: nit: error msg for `TestCustomRow` should be updated `TestHelpers.CustomRow` ########## parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java: ########## @@ -1220,35 +1274,43 @@ public ReadBuilder createReaderFunc( this.batchedReaderFunc == null, "Cannot set reader function: batched reader function already set"); Preconditions.checkArgument( - this.readerFuncWithSchema == null, - "Cannot set reader function: 2-argument reader function already set"); - this.readerFunc = newReaderFunction; + this.readerFunction == null, "Cannot set reader function: reader function already set"); Review Comment: there is a slight behavior change here. Previously, it is ok to set the unary reader function multiple times. With the combining unary and binary reader function, this is not allowed anymore. But I am ok with the slight behavior change, as I don't think it is necessary to allow set the same reader function type multiple times. ########## parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java: ########## @@ -20,37 +20,87 @@ import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> { + private final Map<Integer, Class<? extends StructLike>> typesById = Maps.newHashMap(); + private static final InternalReader<?> INSTANCE = new InternalReader<>(); private InternalReader() {} @SuppressWarnings("unchecked") public static <T extends StructLike> ParquetValueReader<T> create( - Schema expectedSchema, MessageType fileSchema) { - return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema); + Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) { Review Comment: nit: it seems that we switched the order of these two public static factory methods ########## parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java: ########## @@ -1161,6 +1160,61 @@ public static class ReadBuilder implements InternalData.ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private Class<? extends StructLike> rootType = null; + private Map<Integer, Class<? extends StructLike>> customTypes = Maps.newHashMap(); + + public interface ReaderFunction { Review Comment: I really like this new elegant design. much cleaner. ########## core/src/test/java/org/apache/iceberg/TestInternalData.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestInternalData { + + @Parameter(index = 0) + private FileFormat format; + + @Parameters(name = " format = {0}") + protected static List<Object> parameters() { + return Arrays.asList(new Object[] {FileFormat.AVRO}, new Object[] {FileFormat.PARQUET}); + } + + private static final Schema SIMPLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + private static final Schema NESTED_SCHEMA = + new Schema( + Types.NestedField.required(1, "outer_id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "nested_struct", + Types.StructType.of( + Types.NestedField.optional(3, "inner_id", Types.LongType.get()), + Types.NestedField.optional(4, "inner_name", Types.StringType.get())))); + + @TempDir private Path tempDir; + + private final FileIO fileIO = new TestTables.LocalFileIO(); + Review Comment: should we add a test for default `Record` type? ########## core/src/test/java/org/apache/iceberg/TestInternalData.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestInternalData { + + @Parameter(index = 0) + private String format; + + @Parameters(name = "format={0}") + public static Object[][] parameters() { + return new Object[][] {{"avro"}, {"parquet"}}; + } + + private static final Schema NESTED_SCHEMA = + new Schema( + Types.NestedField.required(1, "outer_id", Types.LongType.get()), + Types.NestedField.optional( + 2, + "nested_struct", + Types.StructType.of( + Types.NestedField.optional(3, "inner_id", Types.LongType.get()), + Types.NestedField.optional(4, "inner_name", Types.StringType.get())))); + + @TempDir private Path tempDir; + + private final FileIO fileIO = new TestTables.LocalFileIO(); + + @TestTemplate + public void testCustomRootType() throws IOException { + FileFormat fileFormat = FileFormat.fromString(format); + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List<Record> testData = createSimpleTestRecords(); + + try (FileAppender<Record> appender = + InternalData.write(fileFormat, outputFile).schema(simpleSchema()).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List<PartitionData> readRecords = Lists.newArrayList(); + + try (CloseableIterable<PartitionData> reader = + InternalData.read(fileFormat, inputFile) + .project(simpleSchema()) + .setRootType(PartitionData.class) + .build()) { + for (PartitionData record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSize(testData.size()); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + PartitionData actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + assertThat(actual.get(1, String.class)).isEqualTo(expected.get(1, String.class)); + } + } + + @TestTemplate + public void testCustomTypeForNestedField() throws IOException { + FileFormat fileFormat = FileFormat.fromString(format); + OutputFile outputFile = fileIO.newOutputFile(tempDir.resolve("test." + format).toString()); + + List<Record> testData = createNestedTestRecords(); + + try (FileAppender<Record> appender = + InternalData.write(fileFormat, outputFile).schema(NESTED_SCHEMA).build()) { + appender.addAll(testData); + } + + InputFile inputFile = fileIO.newInputFile(outputFile.location()); + List<Record> readRecords = Lists.newArrayList(); + + try (CloseableIterable<Record> reader = + InternalData.read(fileFormat, inputFile) + .project(NESTED_SCHEMA) + .setCustomType(2, TestCustomRow.class) + .build()) { + for (Record record : reader) { + readRecords.add(record); + } + } + + assertThat(readRecords).hasSize(testData.size()); + + for (int i = 0; i < testData.size(); i++) { + Record expected = testData.get(i); + Record actual = readRecords.get(i); + + assertThat(actual.get(0, Long.class)).isEqualTo(expected.get(0, Long.class)); + + Object expectedNested = expected.get(1); + Object actualNested = actual.get(1); + + if (expectedNested == null && actualNested == null) { + continue; + } + + if (actualNested != null) { Review Comment: maybe just replace line 139-141 with an assertion that either both are null or both are not null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
