This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9e972cb0b97 [bugfix](iceberg)Fix the datafile path error issue for 2.1 
(#36066)
9e972cb0b97 is described below

commit 9e972cb0b97bc4e0b84028a47c8ebd0aedaa0354
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Sat Jun 8 21:51:46 2024 +0800

    [bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066)
    
    bp: #35957
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 15 ++-----
 be/src/vec/exec/format/table/iceberg_reader.h      |  3 +-
 .../docker-compose/iceberg/iceberg.yaml.tpl        |  6 +++
 .../docker-compose/iceberg/spark-init.sql          | 26 ++++++++++++
 .../datasource/iceberg/source/IcebergScanNode.java |  4 +-
 .../datasource/iceberg/source/IcebergSplit.java    |  5 ++-
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 .../iceberg/test_iceberg_read_with_posdelete.out   |  7 ++++
 .../test_iceberg_read_with_posdelete.groovy        | 46 ++++++++++++++++++++++
 9 files changed, 98 insertions(+), 15 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 730f7e44aef..d321fc016f4 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -180,7 +180,8 @@ Status IcebergTableReader::init_row_filters(const 
TFileRangeDesc& range) {
     }
 
     if (position_delete_files.size() > 0) {
-        RETURN_IF_ERROR(_position_delete_base(position_delete_files));
+        RETURN_IF_ERROR(
+                _position_delete_base(table_desc.original_file_path, 
position_delete_files));
     }
     if (equality_delete_files.size() > 0) {
         RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
@@ -293,17 +294,7 @@ Status IcebergTableReader::_shrink_block_if_need(Block* 
block) {
 }
 
 Status IcebergTableReader::_position_delete_base(
-        const std::vector<TIcebergDeleteFileDesc>& delete_files) {
-    std::string data_file_path = _range.path;
-    // the path in _range is remove the namenode prefix,
-    // and the file_path in delete file is full path, so we should add it back.
-    if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
-        std::string fs_name = _params.hdfs_params.fs_name;
-        if (!starts_with(data_file_path, fs_name)) {
-            data_file_path = fs_name + data_file_path;
-        }
-    }
-
+        const std::string data_file_path, const 
std::vector<TIcebergDeleteFileDesc>& delete_files) {
     std::vector<DeleteRows*> delete_rows_array;
     int64_t num_delete_rows = 0;
     std::vector<DeleteFile*> erase_data;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index c0992095c83..07fc1baf90f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -123,7 +123,8 @@ protected:
     void _gen_new_colname_to_value_range();
     static std::string _delet_file_cache_key(const std::string& path) { return 
"delete_" + path; }
 
-    Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>& 
delete_files);
+    Status _position_delete_base(const std::string data_file_path,
+                                 const std::vector<TIcebergDeleteFileDesc>& 
delete_files);
     Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>& 
delete_files);
     virtual std::unique_ptr<GenericReader> _create_equality_reader(
             const TFileRangeDesc& delete_desc) = 0;
diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl 
b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
index bc217c1dd6e..8af2e745c0f 100644
--- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
+++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
@@ -30,10 +30,16 @@ services:
       - ./data/output/spark-warehouse:/home/iceberg/warehouse
       - ./data/output/spark-notebooks:/home/iceberg/notebooks/notebooks
       - ./data:/mnt/data
+      - ./spark-init.sql:/mnt/spark-init.sql
     environment:
       - AWS_ACCESS_KEY_ID=admin
       - AWS_SECRET_ACCESS_KEY=password
       - AWS_REGION=us-east-1
+    entrypoint:  >
+      /bin/sh -c "
+          spark-sql -f /mnt/spark-init.sql 2>&1;
+          tail -f /dev/null
+      "
     networks:
       - doris--iceberg
 
diff --git a/docker/thirdparties/docker-compose/iceberg/spark-init.sql 
b/docker/thirdparties/docker-compose/iceberg/spark-init.sql
new file mode 100644
index 00000000000..d7479a109eb
--- /dev/null
+++ b/docker/thirdparties/docker-compose/iceberg/spark-init.sql
@@ -0,0 +1,26 @@
+create database if not exists demo.test_db;
+drop table if exists demo.test_db.location_s3a_table;
+create table demo.test_db.location_s3a_table (
+    id int,
+    val string
+) using iceberg 
+location 's3a://warehouse/wh/test_db/location_s3a_table'
+tblproperties (
+    'write.delete.mode'='merge-on-read',
+    'write.update.mode'='merge-on-read'
+);
+insert into demo.test_db.location_s3a_table values (1,'a');
+update demo.test_db.location_s3a_table set val='b' where id=1;
+
+drop table if exists demo.test_db.location_s3_table;
+create table demo.test_db.location_s3_table (
+    id int,
+    val string
+) using iceberg 
+location 's3://warehouse/wh/test_db/location_s3_table'
+tblproperties (
+    'write.delete.mode'='merge-on-read',
+    'write.update.mode'='merge-on-read'
+);
+insert into demo.test_db.location_s3_table values (1,'a');
+update demo.test_db.location_s3_table set val='b' where id=1;
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 486e1242d80..25d28b092fb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -143,6 +143,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
         int formatVersion = icebergSplit.getFormatVersion();
         fileDesc.setFormatVersion(formatVersion);
+        fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
@@ -253,7 +254,8 @@ public class IcebergScanNode extends FileQueryScanNode {
                         new String[0],
                         formatVersion,
                         source.getCatalog().getProperties(),
-                        partitionValues);
+                        partitionValues,
+                        splitTask.file().path().toString());
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index b4ea232c004..d867245dbe3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -28,13 +28,16 @@ import java.util.Map;
 @Data
 public class IcebergSplit extends FileSplit {
 
+    private final String originalPath;
+
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
                         Integer formatVersion, Map<String, String> config,
-                        List<String> partitionList) {
+                        List<String> partitionList, String originalPath) {
         super(file, start, length, fileLength, hosts, partitionList);
         this.formatVersion = formatVersion;
         this.config = config;
+        this.originalPath = originalPath;
     }
 
     private Integer formatVersion;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 5f34a261c50..3cb04bda33a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -305,6 +305,7 @@ struct TIcebergFileDesc {
     4: optional Types.TTupleId delete_table_tuple_id;
     // Deprecated
     5: optional Exprs.TExpr file_select_conjunct;
+    6: optional string original_file_path;
 }
 
 struct TPaimonDeletionFileDesc {
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
new file mode 100644
index 00000000000..6c0db029f19
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !qt1 --
+1      b
+
+-- !qt2 --
+1      b
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
new file mode 100644
index 00000000000..139a4091218
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_read_with_delete", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
+            String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String catalog_name = "test_iceberg_read_with_delete"
+
+            sql """drop catalog if exists ${catalog_name}"""
+            sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+                    'type'='iceberg',
+                    'iceberg.catalog.type'='rest',
+                    'uri' = 'http://${externalEnvIp}:${rest_port}',
+                    "s3.access_key" = "admin",
+                    "s3.secret_key" = "password",
+                    "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+                    "s3.region" = "us-east-1"
+                );"""
+
+            qt_qt1 """ select * from ${catalog_name}.test_db.location_s3_table 
order by id """
+            qt_qt2 """ select * from 
${catalog_name}.test_db.location_s3a_table order by id """
+
+            sql """drop catalog if exists ${catalog_name}"""
+        } finally {
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to