rdblue commented on code in PR #13445:
URL: https://github.com/apache/iceberg/pull/13445#discussion_r2292280138
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java:
##########
@@ -79,6 +84,123 @@ public static <T> ParquetValueWriter<T>
buildWriter(StructType dfSchema, Message
ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new
WriteBuilder(type));
}
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(Schema iSchema,
MessageType type) {
Review Comment:
@Fokko, I looked into this and I think that there's a cleaner way to do this
using the original builder structure and aligning the fields the same way that
the other writers do.
Locally, I updated the `writerToFieldIndex` method to work with a Spark
`StructType`, then updated the `InternalRowWriter` so that it calculates the
field indexes and passes them to the base `StructWriter`:
```java
private static class InternalRowWriter extends
ParquetValueWriters.StructWriter<InternalRow> {
static InternalRowWriter create(StructType struct,
List<ParquetValueWriter<?>> writers) {
int[] fieldIndexes = writerToFieldIndex(struct, writers.size());
return new InternalRowWriter(struct, fieldIndexes, writers);
}
private final StructField[] fields;
private final int[] fieldIndexes;
private InternalRowWriter(
StructType struct, int[] fieldIndexes, List<ParquetValueWriter<?>>
writers) {
super(fieldIndexes, writers);
this.fields = struct.fields();
this.fieldIndexes = fieldIndexes;
}
@Override
protected Object get(InternalRow struct, int index) {
return struct.get(index, fields[fieldIndexes[index]].dataType());
}
}
/** Returns a mapping from writer index to field index, skipping Unknown
columns. */
private static int[] writerToFieldIndex(StructType struct, int numWriters)
{
if (null == struct) {
return IntStream.rangeClosed(0, numWriters).toArray();
}
StructField[] fields = struct.fields();
// value writer index to record field index
int[] indexes = new int[numWriters];
int writerIndex = 0;
for (int pos = 0; pos < fields.length; pos += 1) {
if (!(fields[pos].dataType() instanceof NullType)) {
indexes[writerIndex] = pos;
writerIndex += 1;
}
}
return indexes;
}
```
The base `StructWriter` already has a constructor that accepts the
`fieldIndexes` array so it works well. Then this builder no longer needs to use
the Iceberg type and can be mostly unchanged. The new `struct` implementation
is simpler as well:
```java
@Override
public ParquetValueWriter<?> struct(
StructType sStruct, GroupType struct, List<ParquetValueWriter<?>>
fieldWriters) {
List<Type> fields = struct.getFields();
List<ParquetValueWriter<?>> writers =
Lists.newArrayListWithExpectedSize(fieldWriters.size());
for (int i = 0; i < fields.size(); i += 1) {
writers.add(newOption(struct.getType(i), fieldWriters.get(i)));
}
return InternalRowWriter.create(sStruct, writers);
}
```
Then the updated builder can pass in the unmodified DF schema (as was passed
before) and the existing Parquet conversion that omits the unknown fields
works. I think that removes the need to have ways to prune unknown from either
the Iceberg or Spark schemas.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]