This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9e972cb0b97 [bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066) 9e972cb0b97 is described below commit 9e972cb0b97bc4e0b84028a47c8ebd0aedaa0354 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Sat Jun 8 21:51:46 2024 +0800 [bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066) bp: #35957 --- be/src/vec/exec/format/table/iceberg_reader.cpp | 15 ++----- be/src/vec/exec/format/table/iceberg_reader.h | 3 +- .../docker-compose/iceberg/iceberg.yaml.tpl | 6 +++ .../docker-compose/iceberg/spark-init.sql | 26 ++++++++++++ .../datasource/iceberg/source/IcebergScanNode.java | 4 +- .../datasource/iceberg/source/IcebergSplit.java | 5 ++- gensrc/thrift/PlanNodes.thrift | 1 + .../iceberg/test_iceberg_read_with_posdelete.out | 7 ++++ .../test_iceberg_read_with_posdelete.groovy | 46 ++++++++++++++++++++++ 9 files changed, 98 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 730f7e44aef..d321fc016f4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -180,7 +180,8 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { } if (position_delete_files.size() > 0) { - RETURN_IF_ERROR(_position_delete_base(position_delete_files)); + RETURN_IF_ERROR( + _position_delete_base(table_desc.original_file_path, position_delete_files)); } if (equality_delete_files.size() > 0) { RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); @@ -293,17 +294,7 @@ Status IcebergTableReader::_shrink_block_if_need(Block* block) { } Status IcebergTableReader::_position_delete_base( - const std::vector<TIcebergDeleteFileDesc>& delete_files) { - std::string data_file_path = _range.path; - // the path in _range is remove the namenode prefix, - // and the file_path in delete file is full path, so we should add it back. - if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) { - std::string fs_name = _params.hdfs_params.fs_name; - if (!starts_with(data_file_path, fs_name)) { - data_file_path = fs_name + data_file_path; - } - } - + const std::string data_file_path, const std::vector<TIcebergDeleteFileDesc>& delete_files) { std::vector<DeleteRows*> delete_rows_array; int64_t num_delete_rows = 0; std::vector<DeleteFile*> erase_data; diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index c0992095c83..07fc1baf90f 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -123,7 +123,8 @@ protected: void _gen_new_colname_to_value_range(); static std::string _delet_file_cache_key(const std::string& path) { return "delete_" + path; } - Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files); + Status _position_delete_base(const std::string data_file_path, + const std::vector<TIcebergDeleteFileDesc>& delete_files); Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& delete_files); virtual std::unique_ptr<GenericReader> _create_equality_reader( const TFileRangeDesc& delete_desc) = 0; diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index bc217c1dd6e..8af2e745c0f 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -30,10 +30,16 @@ services: - ./data/output/spark-warehouse:/home/iceberg/warehouse - ./data/output/spark-notebooks:/home/iceberg/notebooks/notebooks - ./data:/mnt/data + - ./spark-init.sql:/mnt/spark-init.sql environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + spark-sql -f /mnt/spark-init.sql 2>&1; + tail -f /dev/null + " networks: - doris--iceberg diff --git a/docker/thirdparties/docker-compose/iceberg/spark-init.sql b/docker/thirdparties/docker-compose/iceberg/spark-init.sql new file mode 100644 index 00000000000..d7479a109eb --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/spark-init.sql @@ -0,0 +1,26 @@ +create database if not exists demo.test_db; +drop table if exists demo.test_db.location_s3a_table; +create table demo.test_db.location_s3a_table ( + id int, + val string +) using iceberg +location 's3a://warehouse/wh/test_db/location_s3a_table' +tblproperties ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read' +); +insert into demo.test_db.location_s3a_table values (1,'a'); +update demo.test_db.location_s3a_table set val='b' where id=1; + +drop table if exists demo.test_db.location_s3_table; +create table demo.test_db.location_s3_table ( + id int, + val string +) using iceberg +location 's3://warehouse/wh/test_db/location_s3_table' +tblproperties ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read' +); +insert into demo.test_db.location_s3_table values (1,'a'); +update demo.test_db.location_s3_table set val='b' where id=1; \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 486e1242d80..25d28b092fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -143,6 +143,7 @@ public class IcebergScanNode extends FileQueryScanNode { TIcebergFileDesc fileDesc = new TIcebergFileDesc(); int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); + fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { @@ -253,7 +254,8 @@ public class IcebergScanNode extends FileQueryScanNode { new String[0], formatVersion, source.getCatalog().getProperties(), - partitionValues); + partitionValues, + splitTask.file().path().toString()); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index b4ea232c004..d867245dbe3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -28,13 +28,16 @@ import java.util.Map; @Data public class IcebergSplit extends FileSplit { + private final String originalPath; + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, Integer formatVersion, Map<String, String> config, - List<String> partitionList) { + List<String> partitionList, String originalPath) { super(file, start, length, fileLength, hosts, partitionList); this.formatVersion = formatVersion; this.config = config; + this.originalPath = originalPath; } private Integer formatVersion; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5f34a261c50..3cb04bda33a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -305,6 +305,7 @@ struct TIcebergFileDesc { 4: optional Types.TTupleId delete_table_tuple_id; // Deprecated 5: optional Exprs.TExpr file_select_conjunct; + 6: optional string original_file_path; } struct TPaimonDeletionFileDesc { diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out new file mode 100644 index 00000000000..6c0db029f19 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !qt1 -- +1 b + +-- !qt2 -- +1 b + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy new file mode 100644 index 00000000000..139a4091218 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_read_with_delete", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_read_with_delete" + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + qt_qt1 """ select * from ${catalog_name}.test_db.location_s3_table order by id """ + qt_qt2 """ select * from ${catalog_name}.test_db.location_s3a_table order by id """ + + sql """drop catalog if exists ${catalog_name}""" + } finally { + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org