pvary commented on code in PR #12298: URL: https://github.com/apache/iceberg/pull/12298#discussion_r1961197640
########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link ReaderService} and {@link WriterService} + * implementations. Based on the `file format`, the required `data type` and the reader/writer + * `builderType` the registry returns the correct reader and writer service implementations. These + * services could be used to generate the correct reader and writer builders. + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + private static final Map<Key, ReaderService> READ_BUILDERS = Maps.newConcurrentMap(); + private static final Map<Key, WriterService<?>> WRITE_BUILDERS = Maps.newConcurrentMap(); + + static { + for (ReaderService service : ServiceLoader.load(ReaderService.class)) { + if (READ_BUILDERS.containsKey(service.key())) { + throw new IllegalArgumentException( + String.format( + "Read service %s clashes with %s. Both serves %s", + service.getClass(), READ_BUILDERS.get(service.key()), service.key())); + } + + READ_BUILDERS.putIfAbsent(service.key(), service); + } + + for (WriterService<?> service : ServiceLoader.load(WriterService.class)) { + if (WRITE_BUILDERS.containsKey(service.key())) { + throw new IllegalArgumentException( + String.format( + "Write service %s clashes with %s. Both serves %s", + service.getClass(), WRITE_BUILDERS.get(service.key()), service.key())); + } + + WRITE_BUILDERS.putIfAbsent(service.key(), service); + } + + LOG.info("DataFileServices found: readers={}, writers={}", READ_BUILDERS, WRITE_BUILDERS); + } + + private DataFileServiceRegistry() {} + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder read( + FileFormat format, String returnType, InputFile inputFile, Schema readSchema) { + return read(format, returnType, null, inputFile, readSchema, ImmutableMap.of(), null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder read( + FileFormat format, + String returnType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant) { + return read(format, returnType, null, inputFile, readSchema, idToConstant, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType selects the builder when there are multiple builders for the same format and + * return type + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder read( + FileFormat format, + String returnType, + String builderType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter) { + return READ_BUILDERS + .get(new Key(format, returnType, builderType)) + .builder(inputFile, readSchema, idToConstant, deleteFilter); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return appenderBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return ((WriterService<S>) WRITE_BUILDERS.get(new Key(format, inputType, builderType))) + .appenderBuilder(outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return dataWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return ((WriterService<S>) WRITE_BUILDERS.get(new Key(format, inputType, builderType))) + .dataWriterBuilder(outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return equalityDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return ((WriterService<S>) WRITE_BUILDERS.get(new Key(format, inputType, builderType))) + .equalityDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return positionDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return ((WriterService<S>) WRITE_BUILDERS.get(new Key(format, inputType, builderType))) + .positionDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Service building readers. Implementations should be registered through the {@link + * java.util.ServiceLoader}. {@link DataFileServiceRegistry} is used to collect and serve the + * reader implementations. + */ + public interface ReaderService { + /** Returns the file format which is handled by the service. */ + Key key(); + + /** + * Provides a reader for the given input file which returns objects with a given type. + * + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant used to generate constant values + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + ReaderBuilder builder( + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter); + } + + /** + * Service building writers. Implementations should be registered through the {@link + * java.util.ServiceLoader}. {@link DataFileServiceRegistry} is used to collect and serve the + * writer implementations. + */ + public interface WriterService<S> { + /** Returns the file format which is handled by the service. */ + Key key(); + + /** + * Provides an appender builder for the given output file which writes objects with a given + * input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + AppenderBuilder appenderBuilder(EncryptedOutputFile outputFile, S rowType); + + /** + * Provides a data writer builder for the given output file which writes objects with a given + * input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + DataWriterBuilder dataWriterBuilder(EncryptedOutputFile outputFile, S rowType); + + /** + * Provides an equality delete writer builder for the given output file which writes objects + * with a given input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + <T extends EqualityDeleteWriterBuilder<T>> + EqualityDeleteWriterBuilder<T> equalityDeleteWriterBuilder( + EncryptedOutputFile outputFile, S rowType); + + /** + * Provides a positional delete writer builder for the given output file which writes objects + * with a given input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + <T extends PositionDeleteWriterBuilder<T>> + PositionDeleteWriterBuilder<T> positionDeleteWriterBuilder( + EncryptedOutputFile outputFile, S rowType); + } + + /** Key used to identify readers and writers in the {@link DataFileServiceRegistry}. */ + public static class Key { + private final FileFormat fileFormat; + private final String dataType; + private final String builderType; + + /** + * Create the key when there are no multiple builder types available. + * + * @param fileFormat the service handles + * @param dataType the service handles + */ + public Key(FileFormat fileFormat, String dataType) { + this.fileFormat = fileFormat; + this.dataType = dataType; + this.builderType = null; + } + + /** + * Create the key when there are multiple builder types available. + * + * @param fileFormat the service handles + * @param dataType the service handles + * @param builderType identifier of the builder type when multiple readers/writers are available + */ + public Key(FileFormat fileFormat, String dataType, String builderType) { + this.fileFormat = fileFormat; + this.dataType = dataType; + this.builderType = builderType; Review Comment: We have a concrete example for this: Comet vectorized parquet reader `spark.sql.iceberg.parquet.reader-type` I think it is good if the reader/writer choice is a conscious decision, and not happening based on some behind the scenes algorithm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org