laskoviymishka commented on code in PR #16370: URL: https://github.com/apache/iceberg/pull/16370#discussion_r3442591964
########## data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.VariantShreddingAnalyzer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantValue; + +/** + * Generic {@link Record} implementation that extracts variant values from {@link Record#get(int)} + * using positional indices aligned with {@link Schema#columns()}. + * + * <p>Buffered rows must be laid out against the same {@link Schema} passed as {@code engineSchema}; + * otherwise {@link Record#get(int)} positions will not match the resolved column indices. + */ +class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Schema> { + + private final Map<Schema, Map<String, Integer>> columnIndicesBySchema = Maps.newHashMap(); Review Comment: This analyzer gets built once in `GenericFormatModels.register()` and lives on the `ParquetFormatModel` singleton, so this map is effectively process-global mutable state. `resolveColumnIndex` writes to it with `computeIfAbsent`, and if two writer threads (a KC sink with multiple workers, say) hit it at once we get concurrent structural modification on a plain `HashMap` — corruption or a spin, not just a lost update. There's a second problem stacked on it: `Schema` doesn't override `equals`/`hashCode`, so this is keyed on object identity and never evicts. A long-lived sink across schema evolution grows it without bound. The Spark analyzer stays stateless precisely to dodge both. I'd drop the cache entirely and build the index map locally in `resolveColumnIndex` — it's one linear scan over <100 top-level fields per buffered batch, which is noise on this path. wdyt? ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java: ########## @@ -151,23 +152,31 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction, VariantShreddingAnalyzer<D, S> variantAnalyzer, - Function<S, UnaryOperator<D>> copyFuncFactory) { + Function<S, UnaryOperator<D>> copyFuncFactory, + Class<S> schemaType) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; this.copyFuncFactory = copyFuncFactory; + this.schemaType = schemaType; } @Override + @SuppressWarnings("unchecked") public ModelWriteBuilder<D, S> schema(Schema newSchema) { this.schema = newSchema; internal.schema(newSchema); + if (this.engineSchema == null && Schema.class.equals(schemaType)) { + this.engineSchema = (S) newSchema; + } return this; } @Override public ModelWriteBuilder<D, S> engineSchema(S newSchema) { - this.engineSchema = newSchema; + if (newSchema != null) { Review Comment: This makes a public setter on `ModelWriteBuilder` silently ignore null, with nothing in the contract saying so. Any caller that passes null to reset/clear now gets a no-op and no signal. I think the cleaner home for this guard is `RegistryBasedFileWriterFactory.newDataWriter` — only call `.engineSchema(inputSchema())` when `inputSchema()` is non-null, and leave this setter as a plain assignment. That keeps the KC-null intent without overloading the interface method. wdyt? ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java: ########## @@ -151,23 +152,31 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction, VariantShreddingAnalyzer<D, S> variantAnalyzer, - Function<S, UnaryOperator<D>> copyFuncFactory) { + Function<S, UnaryOperator<D>> copyFuncFactory, + Class<S> schemaType) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; this.copyFuncFactory = copyFuncFactory; + this.schemaType = schemaType; } @Override + @SuppressWarnings("unchecked") public ModelWriteBuilder<D, S> schema(Schema newSchema) { this.schema = newSchema; internal.schema(newSchema); + if (this.engineSchema == null && Schema.class.equals(schemaType)) { Review Comment: The correctness of this whole change hinges on `schema()` being called before `engineSchema()`: `schema()` auto-derives the engine schema, then `engineSchema(null)` is dropped by the null-guard in `engineSchema()` below, so the derived value survives. Reverse the call order and the auto-derive sees a non-null `engineSchema` and skips. That ordering is invisible to anyone implementing the builder. I'd make this order-independent: keep an explicit `engineSchemaWasSet` flag, and do the auto-derive in `build()` only when the flag is false. Then `schema()`/`engineSchema()` can be called in either order and a real engine schema is never clobbered. Separately, using `Schema.class.equals(schemaType)` as the signal leaks schema-type semantics into the parquet layer — any future model that uses `Schema` as its engine-schema token inherits an auto-derive it might not want. An explicit flag on `create()` (or a `shouldDeriveEngineSchema()` hook) would be more honest than a class-identity check. Thoughts? ########## data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRecordVariantShreddingAnalyzer { + + private static final Schema VARIANT_AFTER_ID_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "v", Types.VariantType.get())); + + private static final Schema VARIANT_BEFORE_ID_SCHEMA = + new Schema( + Types.NestedField.optional(1, "v", Types.VariantType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); + + private Variant variant; + private List<Record> records; + + @TempDir private Path temp; + + @BeforeEach + public void before() { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(42), + "b", Variants.of("hello"))); + variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "v", variant)), + record.copy(ImmutableMap.of("id", 2L, "v", variant)), + record.copy(ImmutableMap.of("id", 3L, "v", variant))); + } + + @Test + public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + GroupType typedValue = shreddedTypes.get(2).asGroupType(); + assertThat(typedValue.getName()).isEqualTo("typed_value"); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() { + GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA); + List<Record> variantFirstRecords = + ImmutableList.of( + record.copy(ImmutableMap.of("v", variant, "id", 1L)), + record.copy(ImmutableMap.of("v", variant, "id", 2L))); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, VARIANT_BEFORE_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(1); + assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsSkipsNullVariantValues() { + GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant.setField("id", 1L); + withVariant.setField("v", variant); + + GenericRecord withNullVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withNullVariant.setField("id", 2L); + withNullVariant.setField("v", null); + + GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant2.setField("id", 3L); + withVariant2.setField("v", variant); + + List<Record> recordsWithNulls = ImmutableList.of(withVariant, withNullVariant, withVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue(); + assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWithAllNullVariantValues() { + GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant1.setField("id", 1L); + nullVariant1.setField("v", null); + + GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant2.setField("id", 2L); + nullVariant2.setField("v", null); + + List<Record> allNullVariants = ImmutableList.of(nullVariant1, nullVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).isEmpty(); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNonVariantValues() { + GenericRecord invalidRecord = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + invalidRecord.setField("id", 1L); + invalidRecord.setField("v", "not-a-variant"); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy( + () -> + analyzer.analyzeVariantColumns( + ImmutableList.of(invalidRecord), + VARIANT_AFTER_ID_SCHEMA, + VARIANT_AFTER_ID_SCHEMA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Expected Variant at index 1 but was: java.lang.String"); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNullEngineSchema() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Invalid engine schema: null"); + } + + @Test + public void testGenericFileWriterFactoryShreddingRoundTrip() throws IOException { + Table table = + TestTables.create( + temp.resolve("table").toFile(), + "variant", + VARIANT_AFTER_ID_SCHEMA, + PartitionSpec.unpartitioned(), + 3); + try { + GenericFileWriterFactory writerFactory = + new GenericFileWriterFactory.Builder(table) + .dataFileFormat(FileFormat.PARQUET) + .dataSchema(VARIANT_AFTER_ID_SCHEMA) + .writerProperties( + ImmutableMap.of( + TableProperties.PARQUET_SHRED_VARIANTS, "true", + TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) + .build(); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile(); + + // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as engineSchema. + try (DataWriter<Record> writer = + writerFactory.newDataWriter(encryptedOutputFile, table.spec(), null)) { + for (Record rec : records) { + writer.write(rec); + } + } + + OutputFile outputFile = encryptedOutputFile.encryptingOutputFile(); + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { + assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema()); + } + + assertAllRawParquetRowsShredded(outputFile); + assertRecordsRoundTrip(outputFile); + } finally { + TestTables.clearTables(); + } + } + + @Test + public void testFormatModelRegistryShreddingRoundTrip() throws IOException { + OutputFile outputFile = Files.localOutput(temp.resolve("variant-shredded.parquet").toFile()); + EncryptedOutputFile encryptedOutputFile = EncryptedFiles.plainAsEncryptedOutput(outputFile); + + FileWriterBuilder<DataWriter<Record>, Object> writeBuilder = + FormatModelRegistry.dataWriteBuilder(FileFormat.PARQUET, Record.class, encryptedOutputFile); + + try (DataWriter<Record> writer = + writeBuilder + .schema(VARIANT_AFTER_ID_SCHEMA) + .spec(PartitionSpec.unpartitioned()) + .setAll( + ImmutableMap.of( + TableProperties.PARQUET_SHRED_VARIANTS, "true", + TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) + .build()) { + for (Record rec : records) { + writer.write(rec); + } + } + + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { + assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema()); + } + + assertAllRawParquetRowsShredded(outputFile); + assertRecordsRoundTrip(outputFile); + } + + private void assertShreddedVariantParquetSchema(MessageType parquetSchema) { + GroupType variantGroup = parquetSchema.getType("v").asGroupType(); + assertThat(variantGroup.containsField("typed_value")).isTrue(); + + GroupType typedValue = variantGroup.getType("typed_value").asGroupType(); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + private void assertShreddedTypedValueOnRow(Group row) { + Group variantData = row.getGroup("v", 0); + assertThat(variantData.getFieldRepetitionCount("value")).isEqualTo(0); + + Group typedValue = variantData.getGroup("typed_value", 0); + assertThat(typedValue.getGroup("a", 0).getInteger("typed_value", 0)).isEqualTo(42); + assertThat(typedValue.getGroup("b", 0).getString("typed_value", 0)).isEqualTo("hello"); + } + + private void assertAllRawParquetRowsShredded(OutputFile outputFile) throws IOException { + try (ParquetReader<Group> rawReader = + ParquetReader.builder( + new GroupReadSupport(), new org.apache.hadoop.fs.Path(outputFile.location())) Review Comment: `org.apache.hadoop.fs.Path` is fully qualified inline to dodge the `java.nio.file.Path` import used for `@TempDir`. The idiomatic fix is to import `org.apache.hadoop.fs.Path` and declare the temp dir as `@TempDir private java.nio.file.Path temp;` — same trick applies to the inline `org.apache.iceberg.data.parquet.GenericParquetReaders` in `assertRecordsRoundTrip`. Minor, but it cleans up two FQNs. ########## data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRecordVariantShreddingAnalyzer { + + private static final Schema VARIANT_AFTER_ID_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "v", Types.VariantType.get())); + + private static final Schema VARIANT_BEFORE_ID_SCHEMA = + new Schema( + Types.NestedField.optional(1, "v", Types.VariantType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); + + private Variant variant; + private List<Record> records; + + @TempDir private Path temp; + + @BeforeEach + public void before() { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(42), + "b", Variants.of("hello"))); + variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "v", variant)), + record.copy(ImmutableMap.of("id", 2L, "v", variant)), + record.copy(ImmutableMap.of("id", 3L, "v", variant))); + } + + @Test + public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + GroupType typedValue = shreddedTypes.get(2).asGroupType(); + assertThat(typedValue.getName()).isEqualTo("typed_value"); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() { + GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA); + List<Record> variantFirstRecords = + ImmutableList.of( + record.copy(ImmutableMap.of("v", variant, "id", 1L)), + record.copy(ImmutableMap.of("v", variant, "id", 2L))); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, VARIANT_BEFORE_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(1); + assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsSkipsNullVariantValues() { + GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant.setField("id", 1L); + withVariant.setField("v", variant); + + GenericRecord withNullVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withNullVariant.setField("id", 2L); + withNullVariant.setField("v", null); + + GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant2.setField("id", 3L); + withVariant2.setField("v", variant); + + List<Record> recordsWithNulls = ImmutableList.of(withVariant, withNullVariant, withVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue(); + assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWithAllNullVariantValues() { + GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant1.setField("id", 1L); + nullVariant1.setField("v", null); + + GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant2.setField("id", 2L); + nullVariant2.setField("v", null); + + List<Record> allNullVariants = ImmutableList.of(nullVariant1, nullVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).isEmpty(); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNonVariantValues() { + GenericRecord invalidRecord = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + invalidRecord.setField("id", 1L); + invalidRecord.setField("v", "not-a-variant"); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy( + () -> + analyzer.analyzeVariantColumns( + ImmutableList.of(invalidRecord), + VARIANT_AFTER_ID_SCHEMA, + VARIANT_AFTER_ID_SCHEMA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Expected Variant at index 1 but was: java.lang.String"); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNullEngineSchema() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Invalid engine schema: null"); + } + + @Test + public void testGenericFileWriterFactoryShreddingRoundTrip() throws IOException { + Table table = + TestTables.create( + temp.resolve("table").toFile(), + "variant", + VARIANT_AFTER_ID_SCHEMA, + PartitionSpec.unpartitioned(), + 3); + try { + GenericFileWriterFactory writerFactory = + new GenericFileWriterFactory.Builder(table) + .dataFileFormat(FileFormat.PARQUET) + .dataSchema(VARIANT_AFTER_ID_SCHEMA) + .writerProperties( + ImmutableMap.of( + TableProperties.PARQUET_SHRED_VARIANTS, "true", + TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) + .build(); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile(); + + // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as engineSchema. Review Comment: The comment points at the wrong null — the third arg here is the `StructLike` partition, not the engine schema. The engine-schema null comes from `GenericFileWriterFactory` not forwarding an `inputSchema` to the superclass, so `inputSchema()` returns null and `RegistryBasedFileWriterFactory` calls `builder.engineSchema(null)`. I'd reword so a reader doesn't go looking for the null in the partition arg. ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java: ########## @@ -151,23 +152,31 @@ private WriteBuilderWrapper( EncryptedOutputFile outputFile, WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction, VariantShreddingAnalyzer<D, S> variantAnalyzer, - Function<S, UnaryOperator<D>> copyFuncFactory) { + Function<S, UnaryOperator<D>> copyFuncFactory, + Class<S> schemaType) { this.internal = Parquet.write(outputFile); this.writerFunction = writerFunction; this.variantAnalyzer = variantAnalyzer; this.copyFuncFactory = copyFuncFactory; + this.schemaType = schemaType; } @Override + @SuppressWarnings("unchecked") Review Comment: The only unchecked cast is `(S) newSchema` on the auto-derive line. I'd narrow the suppression to that statement (or a tiny helper) rather than the whole method, so a future cast in here doesn't get silently covered. ########## data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRecordVariantShreddingAnalyzer { + + private static final Schema VARIANT_AFTER_ID_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "v", Types.VariantType.get())); + + private static final Schema VARIANT_BEFORE_ID_SCHEMA = + new Schema( + Types.NestedField.optional(1, "v", Types.VariantType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); + + private Variant variant; + private List<Record> records; + + @TempDir private Path temp; + + @BeforeEach + public void before() { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(42), + "b", Variants.of("hello"))); + variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "v", variant)), + record.copy(ImmutableMap.of("id", 2L, "v", variant)), + record.copy(ImmutableMap.of("id", 3L, "v", variant))); + } + + @Test + public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + GroupType typedValue = shreddedTypes.get(2).asGroupType(); + assertThat(typedValue.getName()).isEqualTo("typed_value"); + assertThat(typedValue.containsField("a")).isTrue(); + assertThat(typedValue.containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() { + GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA); + List<Record> variantFirstRecords = + ImmutableList.of( + record.copy(ImmutableMap.of("v", variant, "id", 1L)), + record.copy(ImmutableMap.of("v", variant, "id", 2L))); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, VARIANT_BEFORE_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(1); + assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsSkipsNullVariantValues() { + GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant.setField("id", 1L); + withVariant.setField("v", variant); + + GenericRecord withNullVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withNullVariant.setField("id", 2L); + withNullVariant.setField("v", null); + + GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + withVariant2.setField("id", 3L); + withVariant2.setField("v", variant); + + List<Record> recordsWithNulls = ImmutableList.of(withVariant, withNullVariant, withVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); + assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue(); + assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue(); + } + + @Test + public void testAnalyzeVariantColumnsWithAllNullVariantValues() { + GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant1.setField("id", 1L); + nullVariant1.setField("v", null); + + GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + nullVariant2.setField("id", 2L); + nullVariant2.setField("v", null); + + List<Record> allNullVariants = ImmutableList.of(nullVariant1, nullVariant2); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns( + allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).isEmpty(); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNonVariantValues() { + GenericRecord invalidRecord = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + invalidRecord.setField("id", 1L); + invalidRecord.setField("v", "not-a-variant"); + + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy( + () -> + analyzer.analyzeVariantColumns( + ImmutableList.of(invalidRecord), + VARIANT_AFTER_ID_SCHEMA, + VARIANT_AFTER_ID_SCHEMA)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Expected Variant at index 1 but was: java.lang.String"); + } + + @Test + public void testAnalyzeVariantColumnsRejectsNullEngineSchema() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Invalid engine schema: null"); + } + + @Test + public void testGenericFileWriterFactoryShreddingRoundTrip() throws IOException { + Table table = + TestTables.create( + temp.resolve("table").toFile(), + "variant", + VARIANT_AFTER_ID_SCHEMA, + PartitionSpec.unpartitioned(), + 3); + try { + GenericFileWriterFactory writerFactory = + new GenericFileWriterFactory.Builder(table) + .dataFileFormat(FileFormat.PARQUET) + .dataSchema(VARIANT_AFTER_ID_SCHEMA) + .writerProperties( + ImmutableMap.of( + TableProperties.PARQUET_SHRED_VARIANTS, "true", + TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) + .build(); + + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build(); + EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile(); + + // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as engineSchema. + try (DataWriter<Record> writer = + writerFactory.newDataWriter(encryptedOutputFile, table.spec(), null)) { + for (Record rec : records) { + writer.write(rec); + } + } + + OutputFile outputFile = encryptedOutputFile.encryptingOutputFile(); + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { + assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema()); + } + + assertAllRawParquetRowsShredded(outputFile); + assertRecordsRoundTrip(outputFile); + } finally { + TestTables.clearTables(); + } + } + + @Test + public void testFormatModelRegistryShreddingRoundTrip() throws IOException { Review Comment: Both round-trip tests only exercise the auto-derive path — neither ever calls `engineSchema()` with a real, non-null `Schema`. Given the `schema()`/`engineSchema()` precedence logic is the riskiest part of this PR, I'd add a `WriteBuilderWrapper`-level test that sets an explicit engine schema and then calls `schema(...)`, asserting the explicit one wins (and ideally one that calls them in the reverse order). That's the case that pins down the protocol we're relying on. ########## data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.VariantShreddingAnalyzer; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantValue; + +/** + * Generic {@link Record} implementation that extracts variant values from {@link Record#get(int)} + * using positional indices aligned with {@link Schema#columns()}. + * + * <p>Buffered rows must be laid out against the same {@link Schema} passed as {@code engineSchema}; + * otherwise {@link Record#get(int)} positions will not match the resolved column indices. + */ +class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Schema> { + + private final Map<Schema, Map<String, Integer>> columnIndicesBySchema = Maps.newHashMap(); + + RecordVariantShreddingAnalyzer() {} + + @Override + protected int resolveColumnIndex(Schema engineSchema, String columnName) { + Preconditions.checkNotNull(engineSchema, "Invalid engine schema: null"); + + Map<String, Integer> indices = + columnIndicesBySchema.computeIfAbsent( + engineSchema, RecordVariantShreddingAnalyzer::indexByName); + Integer index = indices.get(columnName); + return index != null ? index : -1; Review Comment: When `resolveColumnIndex` returns `-1` the base class quietly skips shredding for that column. If an engine schema and the Iceberg schema disagree on a name (a rename, say), we'd silently write unshredded with no diagnostic. A `LOG.warn` here with the column name would make that failure mode discoverable instead of a silent fallback. Not blocking — just easy to lose a day to otherwise. ########## data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java: ########## @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.FileWriterBuilder; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.parquet.ParquetFileTestUtils; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestRecordVariantShreddingAnalyzer { + + private static final Schema VARIANT_AFTER_ID_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "v", Types.VariantType.get())); + + private static final Schema VARIANT_BEFORE_ID_SCHEMA = + new Schema( + Types.NestedField.optional(1, "v", Types.VariantType.get()), + Types.NestedField.required(2, "id", Types.LongType.get())); + + private Variant variant; + private List<Record> records; + + @TempDir private Path temp; + + @BeforeEach + public void before() { + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(42), + "b", Variants.of("hello"))); + variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "v", variant)), + record.copy(ImmutableMap.of("id", 2L, "v", variant)), + record.copy(ImmutableMap.of("id", 3L, "v", variant))); + } + + @Test + public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { + RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); + + Map<Integer, Type> shreddedTypes = + analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA); + + assertThat(shreddedTypes).containsOnlyKeys(2); Review Comment: Every test here shreds exactly one variant column. Could we add one with two variant columns at non-adjacent positions (say `id LONG, v1 VARIANT, other STRING, v2 VARIANT`) and assert both field IDs come back shredded? That's the case that would catch an off-by-one in `indexByName` or a "only the first variant gets shredded" regression — the current single-column tests can't. While we're here, the literal `2` is the field ID of `v`; pulling it into a named constant would read better than the bare key. ########## data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java: ########## @@ -44,11 +47,13 @@ public static void register() { FormatModelRegistry.register( ParquetFormatModel.create( Record.class, - Void.class, + Schema.class, (icebergSchema, fileSchema, engineSchema) -> GenericParquetWriter.create(icebergSchema, fileSchema), (icebergSchema, fileSchema, engineSchema, idToConstant) -> - GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant))); + GenericParquetReaders.buildReader(icebergSchema, fileSchema, idToConstant), + new RecordVariantShreddingAnalyzer(), + (Function<Schema, UnaryOperator<Record>>) engineSchema -> Record::copy)); Review Comment: The lambda takes `engineSchema` and ignores it, returning `Record::copy` unconditionally. That's fine for this path, but the `Function<S, UnaryOperator<D>>` abstraction exists so engines can produce a schema-aware copy — here it's pure indirection, and the parameter name reads as if it's used. If the generic path will never use the engine schema for copying, could we pass a plain `UnaryOperator<Record>` and drop the factory wrapper? If the factory has to stay for signature reasons, I'd rename the parameter to `ignored` so it doesn't mislead. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
