rdblue commented on code in PR #13445:
URL: https://github.com/apache/iceberg/pull/13445#discussion_r2292280138


##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java:
##########
@@ -79,6 +84,123 @@ public static <T> ParquetValueWriter<T> 
buildWriter(StructType dfSchema, Message
         ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new 
WriteBuilder(type));
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T> ParquetValueWriter<T> buildWriter(Schema iSchema, 
MessageType type) {

Review Comment:
   @Fokko, I looked into this and I think that there's a cleaner way to do this 
using the original builder structure and aligning the fields the same way that 
the other writers do.
   
   Locally, I updated the `writerToFieldIndex` method to work with a Spark 
`StructType`, then updated the `InternalRowWriter` so that it calculates the 
field indexes and passes them to the base `StructWriter`:
   
   ```java
     private static class InternalRowWriter extends 
ParquetValueWriters.StructWriter<InternalRow> {
       static InternalRowWriter create(StructType struct, 
List<ParquetValueWriter<?>> writers) {
         int[] fieldIndexes = writerToFieldIndex(struct, writers.size());
         return new InternalRowWriter(struct, fieldIndexes, writers);
       }
   
       private final StructField[] fields;
       private final int[] fieldIndexes;
   
       private InternalRowWriter(
           StructType struct, int[] fieldIndexes, List<ParquetValueWriter<?>> 
writers) {
         super(fieldIndexes, writers);
         this.fields = struct.fields();
         this.fieldIndexes = fieldIndexes;
       }
   
       @Override
       protected Object get(InternalRow struct, int index) {
         return struct.get(index, fields[fieldIndexes[index]].dataType());
       }
     }
   
     /** Returns a mapping from writer index to field index, skipping Unknown 
columns. */
     private static int[] writerToFieldIndex(StructType struct, int numWriters) 
{
       if (null == struct) {
         return IntStream.rangeClosed(0, numWriters).toArray();
       }
   
       StructField[] fields = struct.fields();
   
       // value writer index to record field index
       int[] indexes = new int[numWriters];
       int writerIndex = 0;
       for (int pos = 0; pos < fields.length; pos += 1) {
         if (!(fields[pos].dataType() instanceof NullType)) {
           indexes[writerIndex] = pos;
           writerIndex += 1;
         }
       }
   
       return indexes;
     }
   ```
   
   The base `StructWriter` already has a constructor that accepts the 
`fieldIndexes` array so it works well. Then this builder no longer needs to use 
the Iceberg type and can be mostly unchanged. The new `struct` implementation 
is simpler as well:
   
   ```java
       @Override
       public ParquetValueWriter<?> struct(
           StructType sStruct, GroupType struct, List<ParquetValueWriter<?>> 
fieldWriters) {
         List<Type> fields = struct.getFields();
         List<ParquetValueWriter<?>> writers = 
Lists.newArrayListWithExpectedSize(fieldWriters.size());
         for (int i = 0; i < fields.size(); i += 1) {
           writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
         }
         return InternalRowWriter.create(sStruct, writers);
       }
   ```
   
   Then the updated builder can pass in the unmodified DF schema (as was passed 
before) and the existing Parquet conversion that omits the unknown fields 
works. I think that removes the need to have ways to prune unknown from either 
the Iceberg or Spark schemas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to