snazy commented on code in PR #12298:
URL: https://github.com/apache/iceberg/pull/12298#discussion_r2001211823


##########
core/src/main/java/org/apache/iceberg/avro/Avro.java:
##########
@@ -287,14 +405,17 @@ CodecFactory codec() {
     }
   }
 
+  @Deprecated

Review Comment:
   Are these just deprecated or deprecated-for-removal?



##########
core/src/main/java/org/apache/iceberg/avro/Avro.java:
##########
@@ -99,92 +121,187 @@ public static WriteBuilder write(OutputFile file) {
     return new WriteBuilder(file);
   }
 
+  @Deprecated
   public static WriteBuilder write(EncryptedOutputFile file) {
     return new WriteBuilder(file.encryptingOutputFile());
   }
 
-  public static class WriteBuilder implements InternalData.WriteBuilder {
+  public static <E> AppenderBuilder<E> appender(EncryptedOutputFile file) {
+    Preconditions.checkState(
+        !(file instanceof NativeEncryptionOutputFile), "Native Avro encryption 
is not supported");
+    return new AppenderBuilder<>(file.encryptingOutputFile());
+  }
+
+  public static <E> AppenderBuilder<E> appender(OutputFile file) {
+    return new AppenderBuilder<>(file);
+  }
+
+  @Deprecated
+  public static class WriteBuilder extends 
AppenderBuilderInternal<WriteBuilder, Object> {
+    private WriteBuilder(OutputFile file) {
+      super(file);
+    }
+  }
+
+  public static class AppenderBuilder<E> extends 
AppenderBuilderInternal<AppenderBuilder<E>, E> {
+    private AppenderBuilder(OutputFile file) {
+      super(file);
+    }
+  }
+
+  /** Will be removed when the {@link WriteBuilder} is removed. */
+  @SuppressWarnings("unchecked")
+  static class AppenderBuilderInternal<B extends AppenderBuilderInternal<B, 
E>, E>
+      implements InternalData.WriteBuilder, DataFileAppenderBuilder<B, E> {
     private final OutputFile file;
     private final Map<String, String> config = Maps.newHashMap();
     private final Map<String, String> metadata = Maps.newLinkedHashMap();
     private org.apache.iceberg.Schema schema = null;
     private String name = "table";
     private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private BiFunction<Schema, E, DatumWriter<?>> writerFunction = null;
+    private BiFunction<Schema, E, DatumWriter<?>> deleteRowWriterFunction = 
null;
     private boolean overwrite;
     private MetricsConfig metricsConfig;
     private Function<Map<String, String>, Context> createContextFunc = 
Context::dataContext;
+    private E engineSchema;
 
-    private WriteBuilder(OutputFile file) {
+    private AppenderBuilderInternal(OutputFile file) {
       this.file = file;
     }
 
-    public WriteBuilder forTable(Table table) {
+    @Deprecated
+    public B forTable(Table table) {
       schema(table.schema());
       setAll(table.properties());
       metricsConfig(MetricsConfig.forTable(table));
-      return this;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder schema(org.apache.iceberg.Schema newSchema) {
+    public B schema(org.apache.iceberg.Schema newSchema) {
       this.schema = newSchema;
-      return this;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder named(String newName) {
+    public B named(String newName) {
       this.name = newName;
-      return this;
+      return (B) this;
     }
 
-    public WriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> 
writerFunction) {
-      this.createWriterFunc = writerFunction;
-      return this;
+    public B createWriterFunc(Function<Schema, DatumWriter<?>> 
newWriterFunction) {
+      Preconditions.checkState(
+          writerFunction == null && deleteRowWriterFunction == null,
+          "Cannot set multiple writer builder functions");
+      this.createWriterFunc = newWriterFunction;
+      return (B) this;
+    }
+
+    public B writerFunction(BiFunction<Schema, E, DatumWriter<?>> 
newWriterFunction) {
+      Preconditions.checkState(
+          createWriterFunc == null, "Cannot set multiple writer builder 
functions");
+      this.writerFunction = newWriterFunction;
+      return (B) this;
+    }
+
+    public B deleteRowWriterFunction(BiFunction<Schema, E, DatumWriter<?>> 
newWriterFunction) {
+      Preconditions.checkState(
+          createWriterFunc == null, "Cannot set multiple writer builder 
functions");
+      this.deleteRowWriterFunction = newWriterFunction;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder set(String property, String value) {
+    public B set(String property, String value) {
       config.put(property, value);
-      return this;
+      return (B) this;
     }
 
-    public WriteBuilder setAll(Map<String, String> properties) {
+    @Deprecated
+    public B setAll(Map<String, String> properties) {
       config.putAll(properties);
-      return this;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder meta(String property, String value) {
+    public B meta(String property, String value) {
       metadata.put(property, value);
-      return this;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder meta(Map<String, String> properties) {
+    public B meta(Map<String, String> properties) {
       metadata.putAll(properties);
-      return this;
+      return (B) this;
     }
 
-    public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+    @Override
+    public B metricsConfig(MetricsConfig newMetricsConfig) {
       this.metricsConfig = newMetricsConfig;
-      return this;
+      return (B) this;
     }
 
     @Override
-    public WriteBuilder overwrite() {
+    public B overwrite() {
       return overwrite(true);
     }
 
-    public WriteBuilder overwrite(boolean enabled) {
+    @Override
+    public B overwrite(boolean enabled) {
       this.overwrite = enabled;
-      return this;
+      return (B) this;
     }
 
     // supposed to always be a private method used strictly by data and delete 
write builders
-    private WriteBuilder createContextFunc(
-        Function<Map<String, String>, Context> newCreateContextFunc) {
+    // protected because of inheritance until deprecation of the WriteBuilder

Review Comment:
   super-nit
   ```suggestion
       // package-protected because of inheritance until deprecation of the 
WriteBuilder
   ```



##########
data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java:
##########
@@ -50,62 +49,58 @@ class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
     super(
         table,
         dataFileFormat,
+        DataFileToObjectModelRegistry.GENERIC_OBJECT_MODEL,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
+        positionDeleteRowSchema,
+        ImmutableMap.of(),
+        dataSchema,
+        equalityDeleteRowSchema,
         positionDeleteRowSchema);
   }
 
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
 
-  @Override
   protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(DataWriter::create);
+    throw new UnsupportedOperationException("Deprecated");

Review Comment:
   Maybe add some `... use Xyz` to the message?



##########
core/src/main/java/org/apache/iceberg/io/datafile/WriterBuilderBase.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io.datafile;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+
+/**
+ * Builder for generating one of the following:
+ *
+ * <ul>
+ *   <li>{@link FileAppender}
+ *   <li>{@link DataWriter}
+ *   <li>{@link EqualityDeleteWriter}
+ *   <li>{@link PositionDeleteWriter}
+ * </ul>
+ *
+ * @param <B> type of the builder
+ * @param <E> engine specific schema of the input records used for appender 
initialization
+ */
+interface WriterBuilderBase<B extends WriterBuilderBase<B, E>, E> {

Review Comment:
   Wonder if this interface should be `public`, or it's functions explicitly 
overridden in the `public` interfaces that extend this one - to avoid potential 
visibility issues.



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkObjectModels.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
+
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.datafile.DataFileToObjectModelRegistry;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+public class FlinkObjectModels {
+  public static final String FLINK_OBJECT_MODEL = "flink";
+
+  public static void register() {

Review Comment:
   Just wondering what happens if people (unknowingly) call these `register()` 
functions...
   
   Maybe guard this (and the other auto-registered ones) with something like 
this?
   
   ```suggestion
     private static boolean registered;
   
     public static synchronized void register() {
       if (registered) {
         return;
       }
       registered = true;
   
   ```
   
   Or move the `DataFileToObjectModelRegistry.register*()` invocations to a 
`static {}` initializer in a static-inner-class so it's guaranteed by the JVM 
that it's only called once.



##########
core/src/main/java/org/apache/iceberg/io/datafile/DataFileToObjectModelRegistry.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io.datafile;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registry which provides the available {@link ReadBuilder}s and writer 
builders ({@link
+ * AppenderBuilder}, {@link DataWriterBuilder}, {@link 
EqualityDeleteWriterBuilder}, {@link
+ * PositionDeleteWriterBuilder}). Based on the `file format` and the requested 
`object model name`
+ * the registry returns the correct reader and writer builders. These builders 
could be used to
+ * generate the readers and writers.
+ *
+ * <p>File formats has to register the {@link ReadBuilder}s and the {@link 
DataFileAppenderBuilder}s
+ * which will be used to create the readers and the writers. The readers 
returned directly, the
+ * appenders are wrapped into the {@link AppenderBuilder}, {@link 
DataWriterBuilder}, {@link
+ * EqualityDeleteWriterBuilder} or {@link PositionDeleteWriterBuilder}.
+ */
+public final class DataFileToObjectModelRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileToObjectModelRegistry.class);
+  // The list of classes which are used for registering the reader and writer 
builders
+  private static final List<String> CLASSES_TO_REGISTER =
+      ImmutableList.of(
+          "org.apache.iceberg.parquet.Parquet",
+          "org.apache.iceberg.orc.ORC",
+          "org.apache.iceberg.arrow.vectorized.ArrowReader",
+          "org.apache.iceberg.flink.data.FlinkObjectModels",
+          "org.apache.iceberg.spark.source.SparkObjectModels");
+
+  private static final Map<Key, Function<EncryptedOutputFile, 
DataFileAppenderBuilder<?, ?>>>
+      APPENDER_BUILDERS = Maps.newConcurrentMap();
+  private static final Map<Key, Function<InputFile, ReadBuilder<?>>> 
READ_BUILDERS =
+      Maps.newConcurrentMap();
+
+  public static final String GENERIC_OBJECT_MODEL = "generic";
+
+  /**
+   * Registers a new appender builder for the given format/object model name.
+   *
+   * @param format the file format to write
+   * @param objectModelName accepted by the writer
+   * @param appenderBuilder the appender builder function
+   * @throws IllegalArgumentException if an appender builder for the given key 
already exists
+   */
+  public static void registerAppender(
+      FileFormat format,
+      String objectModelName,
+      Function<EncryptedOutputFile, DataFileAppenderBuilder<?, ?>> 
appenderBuilder) {
+    Key key = new Key(format, objectModelName);
+    if (APPENDER_BUILDERS.containsKey(key)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Appender builder %s clashes with %s. Both serves %s",
+              appenderBuilder.getClass(), APPENDER_BUILDERS.get(key), key));
+    }
+
+    APPENDER_BUILDERS.put(key, appenderBuilder);
+  }
+
+  /**
+   * Registers a new reader builder for the given format/object model name.
+   *
+   * @param format the file format to read
+   * @param objectModelName returned by the reader
+   * @param readBuilder the read builder function
+   * @throws IllegalArgumentException if a read builder for the given key 
already exists

Review Comment:
   ```suggestion
      * @throws IllegalArgumentException if an appender builder for the given 
{@code format} and {@code objectModelName} combination already exists
   ```



##########
core/src/main/java/org/apache/iceberg/io/datafile/WriteBuilder.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io.datafile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
+
+/**
+ * Builder implementation for generating the different writer interfaces. The 
builder is an internal
+ * class and could change without notice. Use one of the following specific 
interfaces:
+ *
+ * <ul>
+ *   <li>{@link FileAppender}
+ *   <li>{@link DataWriter}
+ *   <li>{@link EqualityDeleteWriter}
+ *   <li>{@link PositionDeleteWriter}
+ * </ul>
+ *
+ * The builder wraps the file format specific {@link DataFileAppenderBuilder}. 
To allow further
+ * engine and file format specific configuration changes for the given writer 
the {@link
+ * DataFileAppenderBuilder#build(DataFileAppenderBuilder.WriteMode)} method is 
called with the
+ * correct parameter to create the appender used internally to provide the 
required functionality.
+ *
+ * @param <A> type of the appender
+ * @param <E> engine specific schema of the input records used for appender 
initialization
+ */
+@SuppressWarnings("unchecked")
+class WriteBuilder<B extends WriteBuilder<B, A, E>, A extends 
DataFileAppenderBuilder<A, E>, E>

Review Comment:
   Nit: rename to `WriteBuilderImpl` to distinguish it e.g. from the public 
`ReadBuilder` interface.



##########
core/src/main/java/org/apache/iceberg/io/datafile/DataFileToObjectModelRegistry.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io.datafile;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registry which provides the available {@link ReadBuilder}s and writer 
builders ({@link
+ * AppenderBuilder}, {@link DataWriterBuilder}, {@link 
EqualityDeleteWriterBuilder}, {@link
+ * PositionDeleteWriterBuilder}). Based on the `file format` and the requested 
`object model name`
+ * the registry returns the correct reader and writer builders. These builders 
could be used to
+ * generate the readers and writers.
+ *
+ * <p>File formats has to register the {@link ReadBuilder}s and the {@link 
DataFileAppenderBuilder}s
+ * which will be used to create the readers and the writers. The readers 
returned directly, the
+ * appenders are wrapped into the {@link AppenderBuilder}, {@link 
DataWriterBuilder}, {@link
+ * EqualityDeleteWriterBuilder} or {@link PositionDeleteWriterBuilder}.
+ */
+public final class DataFileToObjectModelRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileToObjectModelRegistry.class);
+  // The list of classes which are used for registering the reader and writer 
builders
+  private static final List<String> CLASSES_TO_REGISTER =
+      ImmutableList.of(
+          "org.apache.iceberg.parquet.Parquet",
+          "org.apache.iceberg.orc.ORC",
+          "org.apache.iceberg.arrow.vectorized.ArrowReader",
+          "org.apache.iceberg.flink.data.FlinkObjectModels",
+          "org.apache.iceberg.spark.source.SparkObjectModels");
+
+  private static final Map<Key, Function<EncryptedOutputFile, 
DataFileAppenderBuilder<?, ?>>>
+      APPENDER_BUILDERS = Maps.newConcurrentMap();
+  private static final Map<Key, Function<InputFile, ReadBuilder<?>>> 
READ_BUILDERS =
+      Maps.newConcurrentMap();
+
+  public static final String GENERIC_OBJECT_MODEL = "generic";
+
+  /**
+   * Registers a new appender builder for the given format/object model name.
+   *
+   * @param format the file format to write
+   * @param objectModelName accepted by the writer
+   * @param appenderBuilder the appender builder function
+   * @throws IllegalArgumentException if an appender builder for the given key 
already exists
+   */
+  public static void registerAppender(
+      FileFormat format,
+      String objectModelName,
+      Function<EncryptedOutputFile, DataFileAppenderBuilder<?, ?>> 
appenderBuilder) {
+    Key key = new Key(format, objectModelName);
+    if (APPENDER_BUILDERS.containsKey(key)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Appender builder %s clashes with %s. Both serves %s",
+              appenderBuilder.getClass(), APPENDER_BUILDERS.get(key), key));
+    }
+
+    APPENDER_BUILDERS.put(key, appenderBuilder);
+  }
+
+  /**
+   * Registers a new reader builder for the given format/object model name.
+   *
+   * @param format the file format to read
+   * @param objectModelName returned by the reader
+   * @param readBuilder the read builder function
+   * @throws IllegalArgumentException if a read builder for the given key 
already exists
+   */
+  public static void registerReader(
+      FileFormat format, String objectModelName, Function<InputFile, 
ReadBuilder<?>> readBuilder) {
+    Key key = new Key(format, objectModelName);
+    if (READ_BUILDERS.containsKey(key)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Read builder %s clashes with %s. Both serves %s",
+              readBuilder.getClass(), READ_BUILDERS.get(key), key));
+    }
+
+    READ_BUILDERS.put(new Key(format, objectModelName), readBuilder);
+  }
+
+  @SuppressWarnings("CatchBlockLogException")
+  private static void registerSupportedFormats() {
+    Avro.register();
+
+    // Uses dynamic methods to call the `register` for the listed classes
+    for (String classToRegister : CLASSES_TO_REGISTER) {
+      try {
+        DynMethods.StaticMethod register =
+            
DynMethods.builder("register").impl(classToRegister).buildStaticChecked();
+
+        register.invoke();
+
+      } catch (NoSuchMethodException e) {
+        // failing to register readers/writers is normal and does not require 
a stack trace
+        LOG.info("Unable to register {} for data files: {}", classToRegister, 
e.getMessage());
+      }
+    }
+  }
+
+  static {
+    registerSupportedFormats();
+  }
+
+  private DataFileToObjectModelRegistry() {}
+
+  /**
+   * Provides a reader builder for the given input file which returns objects 
with a given object
+   * model name.
+   *
+   * @param format of the file to read
+   * @param objectModelName returned by the reader
+   * @param inputFile to read
+   * @return {@link ReadBuilder} for building the actual reader
+   */
+  public static ReadBuilder<?> readBuilder(
+      FileFormat format, String objectModelName, InputFile inputFile) {
+    return READ_BUILDERS.get(new Key(format, 
objectModelName)).apply(inputFile);
+  }
+
+  /**
+   * Provides an appender builder for the given output file which writes 
objects with a given object
+   * model name.
+   *
+   * @param format of the file to write
+   * @param objectModelName accepted by the writer
+   * @param outputFile to write
+   * @param <E> type for the engine specific schema used by the builder
+   * @return {@link ReadBuilder} for building the actual reader
+   */
+  public static <E> AppenderBuilder<?, E> appenderBuilder(
+      FileFormat format, String objectModelName, EncryptedOutputFile 
outputFile) {
+    return writerFor(format, objectModelName, outputFile);
+  }
+
+  /**
+   * Provides a data writer builder for the given output file which writes 
objects with a given
+   * object model name.
+   *
+   * @param format of the file to write
+   * @param objectModelName accepted by the writer
+   * @param outputFile to write
+   * @param <E> type for the engine specific schema used by the builder
+   * @return {@link ReadBuilder} for building the actual reader
+   */
+  public static <E> DataWriterBuilder<?, E> writerBuilder(
+      FileFormat format, String objectModelName, EncryptedOutputFile 
outputFile) {
+    return writerFor(format, objectModelName, outputFile);
+  }
+
+  /**
+   * Provides an equality delete writer builder for the given output file 
which writes objects with
+   * a given object model name.
+   *
+   * @param format of the file to write
+   * @param objectModelName accepted by the writer
+   * @param outputFile to write
+   * @param <E> type for the engine specific schema used by the builder
+   * @return {@link ReadBuilder} for building the actual reader
+   */
+  public static <E> EqualityDeleteWriterBuilder<?, E> 
equalityDeleteWriterBuilder(
+      FileFormat format, String objectModelName, EncryptedOutputFile 
outputFile) {
+    return writerFor(format, objectModelName, outputFile);
+  }
+
+  /**
+   * Provides a position delete writer builder for the given output file which 
writes objects with a
+   * given object model name.
+   *
+   * @param format of the file to write
+   * @param objectModelName accepted by the writer
+   * @param outputFile to write
+   * @param <E> type for the engine specific schema used by the builder
+   * @return {@link ReadBuilder} for building the actual reader
+   */
+  public static <E> PositionDeleteWriterBuilder<?, E> 
positionDeleteWriterBuilder(
+      FileFormat format, String objectModelName, EncryptedOutputFile 
outputFile) {
+    return writerFor(format, objectModelName, outputFile);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <E> WriteBuilder<?, ?, E> writerFor(
+      FileFormat format, String objectModelName, EncryptedOutputFile 
outputFile) {
+    return new WriteBuilder<>(
+        (DataFileAppenderBuilder<?, E>)
+            APPENDER_BUILDERS.get(new Key(format, 
objectModelName)).apply(outputFile),
+        outputFile.encryptingOutputFile().location(),
+        format);
+  }
+
+  /** Key used to identify readers and writers in the {@link 
DataFileToObjectModelRegistry}. */
+  private static class Key {

Review Comment:
   Maybe just
   ```java
   record Key(FileFormat fileFormat, String objectModelName) {}
   ```



##########
core/src/main/java/org/apache/iceberg/io/datafile/DataFileToObjectModelRegistry.java:
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.io.datafile;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Registry which provides the available {@link ReadBuilder}s and writer 
builders ({@link
+ * AppenderBuilder}, {@link DataWriterBuilder}, {@link 
EqualityDeleteWriterBuilder}, {@link
+ * PositionDeleteWriterBuilder}). Based on the `file format` and the requested 
`object model name`
+ * the registry returns the correct reader and writer builders. These builders 
could be used to
+ * generate the readers and writers.
+ *
+ * <p>File formats has to register the {@link ReadBuilder}s and the {@link 
DataFileAppenderBuilder}s
+ * which will be used to create the readers and the writers. The readers 
returned directly, the
+ * appenders are wrapped into the {@link AppenderBuilder}, {@link 
DataWriterBuilder}, {@link
+ * EqualityDeleteWriterBuilder} or {@link PositionDeleteWriterBuilder}.
+ */
+public final class DataFileToObjectModelRegistry {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataFileToObjectModelRegistry.class);
+  // The list of classes which are used for registering the reader and writer 
builders
+  private static final List<String> CLASSES_TO_REGISTER =
+      ImmutableList.of(
+          "org.apache.iceberg.parquet.Parquet",
+          "org.apache.iceberg.orc.ORC",
+          "org.apache.iceberg.arrow.vectorized.ArrowReader",
+          "org.apache.iceberg.flink.data.FlinkObjectModels",
+          "org.apache.iceberg.spark.source.SparkObjectModels");
+
+  private static final Map<Key, Function<EncryptedOutputFile, 
DataFileAppenderBuilder<?, ?>>>
+      APPENDER_BUILDERS = Maps.newConcurrentMap();
+  private static final Map<Key, Function<InputFile, ReadBuilder<?>>> 
READ_BUILDERS =
+      Maps.newConcurrentMap();
+
+  public static final String GENERIC_OBJECT_MODEL = "generic";
+
+  /**
+   * Registers a new appender builder for the given format/object model name.
+   *
+   * @param format the file format to write
+   * @param objectModelName accepted by the writer
+   * @param appenderBuilder the appender builder function
+   * @throws IllegalArgumentException if an appender builder for the given key 
already exists

Review Comment:
   ```suggestion
      * @throws IllegalArgumentException if an appender builder for the given 
{@code format} and {@code objectModelName} combination already exists
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to