liurenjie1024 commented on code in PR #12298: URL: https://github.com/apache/iceberg/pull/12298#discussion_r1965202207
########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link ReaderService} and {@link WriterService} + * implementations. Based on the `file format`, the required `data type` and the reader/writer + * `builderType` the registry returns the correct reader and writer service implementations. These + * services could be used to generate the correct reader and writer builders. + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + + private DataFileServiceRegistry() {} + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, String returnType, InputFile inputFile, Schema readSchema) { + return readerBuilder(format, returnType, null, inputFile, readSchema, ImmutableMap.of(), null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant) { + return readerBuilder(format, returnType, null, inputFile, readSchema, idToConstant, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType selects the builder when there are multiple builders for the same format and + * return type + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + String builderType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter) { + return Registry.readerBuilderFor(format, returnType, builderType) + .builder(inputFile, readSchema, idToConstant, deleteFilter); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return appenderBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .appenderBuilder(outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return dataWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .dataWriterBuilder(outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( Review Comment: I don't think file format should consider eqaulity deletion/pos deletion here. ########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link ReaderService} and {@link WriterService} + * implementations. Based on the `file format`, the required `data type` and the reader/writer + * `builderType` the registry returns the correct reader and writer service implementations. These + * services could be used to generate the correct reader and writer builders. + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + + private DataFileServiceRegistry() {} + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, String returnType, InputFile inputFile, Schema readSchema) { + return readerBuilder(format, returnType, null, inputFile, readSchema, ImmutableMap.of(), null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant) { + return readerBuilder(format, returnType, null, inputFile, readSchema, idToConstant, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType selects the builder when there are multiple builders for the same format and + * return type + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + String builderType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter) { + return Registry.readerBuilderFor(format, returnType, builderType) + .builder(inputFile, readSchema, idToConstant, deleteFilter); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return appenderBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .appenderBuilder(outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( Review Comment: I don't quite understand in what case need this? I think append would be enough? ########## core/src/main/java/org/apache/iceberg/io/datafile/ReaderBuilder.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.nio.ByteBuffer; +import org.apache.iceberg.InternalData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mapping.NameMapping; + +/** Builder API for reading Iceberg data files. */ +public interface ReaderBuilder extends InternalData.ReadBuilder { + /** + * Restricts the read to the given range: [start, start + length). + * + * @param newStart the start position for this read + * @param newLength the length of the range this read should scan + */ + @Override + ReaderBuilder split(long newStart, long newLength); + + /** Read only the given columns. */ + @Override + ReaderBuilder project(Schema newSchema); + + /** Sets the reader to case-sensitive when matching column names. */ + default ReaderBuilder caseInsensitive() { + return caseSensitive(false); + } + + default ReaderBuilder caseSensitive(boolean newCaseSensitive) { + // Just ignore case sensitivity if not available + return this; + } + + /** Enables record filtering. */ + default ReaderBuilder filterRecords(boolean newFilterRecords) { + // Skip filtering if not available + return this; + } + + /** + * Pushes down the {@link Expression} filter for the reader to prevent reading unnecessary + * records. + */ + default ReaderBuilder filter(Expression newFilter) { + // Skip filtering if not available + return this; + } + + /** Sets configuration key/value pairs for the reader. */ + default ReaderBuilder set(String key, String value) { + throw new UnsupportedOperationException("Not supported"); + } + + /** Enables reusing the containers returned by the reader. Decreases pressure on GC. */ + @Override + default ReaderBuilder reuseContainers() { Review Comment: Seems it should not be here? These are parquet reader specific. ########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link ReaderService} and {@link WriterService} + * implementations. Based on the `file format`, the required `data type` and the reader/writer + * `builderType` the registry returns the correct reader and writer service implementations. These + * services could be used to generate the correct reader and writer builders. + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + + private DataFileServiceRegistry() {} + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, String returnType, InputFile inputFile, Schema readSchema) { + return readerBuilder(format, returnType, null, inputFile, readSchema, ImmutableMap.of(), null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant) { + return readerBuilder(format, returnType, null, inputFile, readSchema, idToConstant, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType selects the builder when there are multiple builders for the same format and + * return type + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + String builderType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter) { + return Registry.readerBuilderFor(format, returnType, builderType) + .builder(inputFile, readSchema, idToConstant, deleteFilter); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return appenderBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .appenderBuilder(outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return dataWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .dataWriterBuilder(outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return equalityDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .equalityDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return positionDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .positionDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Internal class providing the actual registry. This is a separate class to avoid class loader + * issues. + */ + private static final class Registry { + private static final Map<Key, ReaderService> READ_BUILDERS = Maps.newConcurrentMap(); + private static final Map<Key, WriterService<?>> WRITE_BUILDERS = Maps.newConcurrentMap(); + + static { + for (ReaderService service : ServiceLoader.load(ReaderService.class)) { + if (READ_BUILDERS.containsKey(service.key())) { + throw new IllegalArgumentException( + String.format( + "Read service %s clashes with %s. Both serves %s", + service.getClass(), READ_BUILDERS.get(service.key()), service.key())); + } + + READ_BUILDERS.putIfAbsent(service.key(), service); + } + + for (WriterService<?> service : ServiceLoader.load(WriterService.class)) { + if (WRITE_BUILDERS.containsKey(service.key())) { + throw new IllegalArgumentException( + String.format( + "Write service %s clashes with %s. Both serves %s", + service.getClass(), WRITE_BUILDERS.get(service.key()), service.key())); + } + + WRITE_BUILDERS.putIfAbsent(service.key(), service); + } + + LOG.info("DataFileServices found: readers={}, writers={}", READ_BUILDERS, WRITE_BUILDERS); + } + + private static ReaderService readerBuilderFor( + FileFormat format, String inputType, String builderType) { + Key key = new Key(format, inputType, builderType); + ReaderService service = READ_BUILDERS.get(key); + if (service == null) { + throw new IllegalArgumentException( + String.format("No reader builder registered for key %s", key)); + } + + return service; + } + + private static <S> WriterService<S> writeBuilderFor( + FileFormat format, String inputType, String builderType) { + Key key = new Key(format, inputType, builderType); + WriterService<S> service = (WriterService<S>) WRITE_BUILDERS.get(key); + if (service == null) { + throw new IllegalArgumentException( + String.format("No writer builder registered for key %s", key)); + } + + return service; + } + } + + /** + * Service building readers. Implementations should be registered through the {@link + * java.util.ServiceLoader}. {@link DataFileServiceRegistry} is used to collect and serve the + * reader implementations. + */ + public interface ReaderService { + /** Returns the file format which is handled by the service. */ + Key key(); + + /** + * Provides a reader for the given input file which returns objects with a given type. + * + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant used to generate constant values + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + ReaderBuilder builder( + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter); + } + + /** + * Service building writers. Implementations should be registered through the {@link + * java.util.ServiceLoader}. {@link DataFileServiceRegistry} is used to collect and serve the + * writer implementations. + */ + public interface WriterService<S> { + /** Returns the file format which is handled by the service. */ + Key key(); + + /** + * Provides an appender builder for the given output file which writes objects with a given + * input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + AppenderBuilder appenderBuilder(EncryptedOutputFile outputFile, S rowType); + + /** + * Provides a data writer builder for the given output file which writes objects with a given + * input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + DataWriterBuilder dataWriterBuilder(EncryptedOutputFile outputFile, S rowType); + + /** + * Provides an equality delete writer builder for the given output file which writes objects + * with a given input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + <T extends EqualityDeleteWriterBuilder<T>> + EqualityDeleteWriterBuilder<T> equalityDeleteWriterBuilder( + EncryptedOutputFile outputFile, S rowType); + + /** + * Provides a positional delete writer builder for the given output file which writes objects + * with a given input type. + * + * @param outputFile to write to + * @param rowType of the input records + */ + <T extends PositionDeleteWriterBuilder<T>> + PositionDeleteWriterBuilder<T> positionDeleteWriterBuilder( + EncryptedOutputFile outputFile, S rowType); + } + + /** Key used to identify readers and writers in the {@link DataFileServiceRegistry}. */ + public static class Key { + private final FileFormat fileFormat; + private final String dataType; Review Comment: Is this things like `arrow`, `internal row`? ########## core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io.datafile; + +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registry which maintains the available {@link ReaderService} and {@link WriterService} + * implementations. Based on the `file format`, the required `data type` and the reader/writer + * `builderType` the registry returns the correct reader and writer service implementations. These + * services could be used to generate the correct reader and writer builders. + */ +public final class DataFileServiceRegistry { + private static final Logger LOG = LoggerFactory.getLogger(DataFileServiceRegistry.class); + + private DataFileServiceRegistry() {} + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, String returnType, InputFile inputFile, Schema readSchema) { + return readerBuilder(format, returnType, null, inputFile, readSchema, ImmutableMap.of(), null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant) { + return readerBuilder(format, returnType, null, inputFile, readSchema, idToConstant, null); + } + + /** + * Provides a reader for the given input file which returns objects with a given returnType. + * + * @param format of the file to read + * @param returnType returned by the reader + * @param builderType selects the builder when there are multiple builders for the same format and + * return type + * @param inputFile to read + * @param readSchema to use when reading the data file + * @param idToConstant to use for getting value for constant fields + * @param deleteFilter is used when the delete record filtering is pushed down to the reader + * @return {@link ReaderBuilder} for building the actual reader + */ + public static ReaderBuilder readerBuilder( + FileFormat format, + String returnType, + String builderType, + InputFile inputFile, + Schema readSchema, + Map<Integer, ?> idToConstant, + DeleteFilter<?> deleteFilter) { + return Registry.readerBuilderFor(format, returnType, builderType) + .builder(inputFile, readSchema, idToConstant, deleteFilter); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return appenderBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an appender builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual appender + */ + public static <S> AppenderBuilder appenderBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .appenderBuilder(outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return dataWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a data writer builder for the given input file which writes objects with a given + * inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link DataWriterBuilder} for building the actual writer + */ + public static <S> DataWriterBuilder dataWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .dataWriterBuilder(outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return equalityDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides an equality delete writer builder for the given input file which writes objects with a + * given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends EqualityDeleteWriterBuilder<B>> + EqualityDeleteWriterBuilder<B> equalityDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .equalityDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, String inputType, EncryptedOutputFile outputFile, S rowType) { + return positionDeleteWriterBuilder(format, inputType, null, outputFile, rowType); + } + + /** + * Provides a positional delete writer builder for the given input file which writes objects with + * a given inputType. + * + * @param format of the file to write + * @param inputType of the rows + * @param builderType selects the builder when there are multiple builders for the same format and + * input type + * @param outputFile to write + * @param rowType of the native input data + * @return {@link AppenderBuilder} for building the actual writer + */ + public static <S, B extends PositionDeleteWriterBuilder<B>> + PositionDeleteWriterBuilder<B> positionDeleteWriterBuilder( + FileFormat format, + String inputType, + String builderType, + EncryptedOutputFile outputFile, + S rowType) { + return Registry.writeBuilderFor(format, inputType, builderType) + .positionDeleteWriterBuilder(outputFile, rowType); + } + + /** + * Internal class providing the actual registry. This is a separate class to avoid class loader + * issues. + */ + private static final class Registry { + private static final Map<Key, ReaderService> READ_BUILDERS = Maps.newConcurrentMap(); Review Comment: This is more like a convention problem, I think maybe we just need to store in `FileFormatService` in registry? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org