This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-deltalake
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-deltalake by this push:
     new 53f4fd894b6 [feature-wip](catalog) support deltalake catalog (#25020)
53f4fd894b6 is described below

commit 53f4fd894b61a8bd7fceefd74d89f75d011bc497
Author: Peilin Sun <[email protected]>
AuthorDate: Sat Oct 28 22:25:27 2023 +0800

    [feature-wip](catalog) support deltalake catalog (#25020)
---
 be/src/vec/exec/format/table/deltalake_reader.cpp  |  81 +++++++++++
 be/src/vec/exec/format/table/deltalake_reader.h    |  70 ++++++++++
 env.sh                                             |   0
 fe/be-java-extensions/deltalake-scanner/pom.xml    | 124 +++++++++++++++++
 .../doris/deltalake/DeltaLakeColumnValue.java      | 141 +++++++++++++++++++
 .../doris/deltalake/DeltaLakeJniScanner.java       | 104 ++++++++++++++
 .../doris/deltalake/DeltaLakeScannerUtils.java     |  47 +++++++
 .../src/main/resources/package.xml                 |  41 ++++++
 fe/be-java-extensions/pom.xml                      |   1 +
 fe/fe-core/pom.xml                                 |   7 +
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../apache/doris/planner/SingleNodePlanner.java    |   4 +
 .../doris/planner/external/FileQueryScanNode.java  |   4 +
 .../doris/planner/external/TableFormatType.java    |   1 +
 .../external/deltalake/DeltaLakeScanNode.java      | 153 +++++++++++++++++++++
 .../DeltaLakeSource.java}                          |  32 +++--
 .../DeltaLakeSplit.java}                           |  25 ++--
 .../org/apache/doris/statistics/DeriveFactory.java |   1 +
 .../apache/doris/statistics/StatisticalType.java   |   1 +
 fe/pom.xml                                         |   1 -
 gensrc/thrift/PlanNodes.thrift                     |   8 ++
 21 files changed, 824 insertions(+), 26 deletions(-)

diff --git a/be/src/vec/exec/format/table/deltalake_reader.cpp 
b/be/src/vec/exec/format/table/deltalake_reader.cpp
new file mode 100644
index 00000000000..f2698eaafd8
--- /dev/null
+++ b/be/src/vec/exec/format/table/deltalake_reader.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "deltalake_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+}
+} // namespace doris
+
+namespace doris::vectorized {
+
+DeltaLakeJniReader::DeltaLakeJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                       RuntimeState* state, RuntimeProfile* 
profile,
+                                       const TFileRangeDesc& range)
+        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+    std::vector<std::string> column_names;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        column_names.emplace_back(field);
+    }
+    std::map<String, String> params;
+    params["db_name"] = range.table_format_params.delta_lake_params.db_name;
+    params["table_name"] = 
range.table_format_params.delta_lake_params.table_name;
+    params["path"] = range.table_format_params.delta_lake_params.path;
+    params["delta_lake_column_names"] =
+            
range.table_format_params.delta_lake_params.delta_lake_column_names;
+    params["conf"] = range.table_format_params.delta_lake_params.conf;
+    _jni_connector = std::make_unique<JniConnector>(
+            "org/apache/doris/deltalake/DeltaLakeJniScanner", params, 
column_names);
+}
+
+Status DeltaLakeJniReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status DeltaLakeJniReader::get_columns(
+        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+        std::unordered_set<std::string>* missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status DeltaLakeJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/deltalake_reader.h 
b/be/src/vec/exec/format/table/deltalake_reader.h
new file mode 100644
index 00000000000..8e6668d1404
--- /dev/null
+++ b/be/src/vec/exec/format/table/deltalake_reader.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+}
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+class DeltaLakeJniReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(DeltaLakeJniReader);
+
+public:
+    DeltaLakeJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
+                       RuntimeProfile* profile, const TFileRangeDesc& range);
+
+    ~DeltaLakeJniReader() override = default;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
+
+private:
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/env.sh b/env.sh
old mode 100755
new mode 100644
diff --git a/fe/be-java-extensions/deltalake-scanner/pom.xml 
b/fe/be-java-extensions/deltalake-scanner/pom.xml
new file mode 100644
index 00000000000..0870c0199d1
--- /dev/null
+++ b/fe/be-java-extensions/deltalake-scanner/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>deltalake-scanner</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>fe-common</artifactId>
+                    <groupId>org.apache.doris</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.9.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.facebook.presto.hive</groupId>
+            <artifactId>hive-apache</artifactId>
+            <version>${presto.hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.delta</groupId>
+            <artifactId>delta-standalone_2.12</artifactId>
+            <version>3.0.0rc1</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>fe-core</artifactId>
+            <version>1.2-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <finalName>deltalake-scanner</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/resources/package.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
new file mode 100644
index 00000000000..dcf759bbc84
--- /dev/null
+++ 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import io.delta.standalone.data.RowRecord;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+public class DeltaLakeColumnValue implements ColumnValue {
+
+    private String fieldName;
+    private RowRecord record;
+
+    public DeltaLakeColumnValue() {
+
+    }
+
+    public void setfieldName(String name) {
+        this.fieldName = name;
+    }
+
+    public void setRecord(RowRecord record) {
+        this.record = record;
+    }
+
+    @Override
+    public boolean canGetStringAsBytes() {
+        return true;
+    }
+
+    @Override
+    public boolean isNull() {
+        return record.isNullAt(fieldName);
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return record.getBoolean(fieldName);
+    }
+
+    @Override
+    public byte getByte() {
+        return record.getByte(fieldName);
+    }
+
+    @Override
+    public short getShort() {
+        return record.getShort(fieldName);
+    }
+
+    @Override
+    public int getInt() {
+        return record.getInt(fieldName);
+    }
+
+    @Override
+    public float getFloat() {
+        return record.getFloat(fieldName);
+    }
+
+    @Override
+    public long getLong() {
+        return record.getLong(fieldName);
+    }
+
+    @Override
+    public double getDouble() {
+        return record.getDouble(fieldName);
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        return BigInteger.valueOf(record.getInt(fieldName));
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return record.getBigDecimal(fieldName);
+    }
+
+    @Override
+    public String getString() {
+        return record.getString(fieldName);
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        return record.getString(fieldName).getBytes();
+    }
+
+    @Override
+    public LocalDate getDate() {
+        return record.getDate(fieldName).toLocalDate();
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        return record.getTimestamp(fieldName).toLocalDateTime();
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return record.getBinary(fieldName);
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+
+    }
+}
diff --git 
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
new file mode 100644
index 00000000000..952593377e5
--- /dev/null
+++ 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.data.RowRecord;
+import io.delta.standalone.types.DataType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.doris.deltalake.DeltaLakeScannerUtils.decodeStringToConf;
+
+public class DeltaLakeJniScanner extends JniScanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeltaLakeJniScanner.class);
+    private final String dbName;
+    private final String tblName;
+    private final String path;
+    private CloseableIterator<RowRecord> iter = null;
+    private final Configuration conf;
+    private final DeltaLakeColumnValue columnValue = new 
DeltaLakeColumnValue();
+
+    public DeltaLakeJniScanner(int batchSize, Map<String, String> params) {
+        LOG.debug("params:{}", params);
+        conf =decodeStringToConf( params.get("conf"));
+        dbName = params.get("db_name");
+        tblName = params.get("table_name");
+        path = params.get("path");
+        super.batchSize = batchSize;
+        super.fields = params.get("deltalake_column_names").split(",");
+    }
+
+    @Override
+    public void open() throws IOException {
+        initIter();
+        parseRequiredTypes();
+    }
+
+    @Override
+    public void close() throws IOException {
+        iter.close();
+    }
+
+    @Override
+    protected int getNext() {
+        int rows = 0;
+        RowRecord row;
+        while (iter.hasNext()) {
+            row = iter.next();
+            columnValue.setRecord(row);
+            for (int i = 0; i < types.length; i++) {
+                columnValue.setfieldName(types[i].getName());
+                appendData(i, columnValue);
+            }
+            rows++;
+        }
+        return rows;
+    }
+
+    private void initIter() {
+        DeltaLog deltaLog = DeltaLog.forTable(conf, path);
+        iter = deltaLog.snapshot().open();
+    }
+
+    private void parseRequiredTypes() {
+        DeltaLog deltaLog = DeltaLog.forTable(conf, path);
+        StructType schema = deltaLog.snapshot().getMetadata().getSchema();
+        assert schema != null;
+        ColumnType[] columnTypes = new ColumnType[schema.length()];
+        int i = 0;
+        for (StructField field : schema.getFields()) {
+            String columnName = field.getName();
+            DataType dataType = field.getDataType();
+            columnTypes[i] = ColumnType.parseType(columnName, 
dataType.toString());
+            i++;
+        }
+        super.types = columnTypes;
+    }
+}
diff --git 
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
new file mode 100644
index 00000000000..9b15575f2d1
--- /dev/null
+++ 
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+
+
+public class DeltaLakeScannerUtils {
+    private static final Base64.Decoder BASE64_DECODER = 
Base64.getUrlDecoder();
+
+    public static Configuration decodeStringToConf(String encodedStr) {
+        final byte[] bytes = 
BASE64_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+        try {
+            Configuration conf = new Configuration();
+            ByteArrayInputStream byteArrayOutputStream = new 
ByteArrayInputStream(bytes);
+            ObjectInputStream objectOutputStream = new 
ObjectInputStream(byteArrayOutputStream);
+            conf.readFields(objectOutputStream);
+            objectOutputStream.close();
+            return conf;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git 
a/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index af8584d1776..59f59f1a80f 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,6 +21,7 @@ under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <modules>
+        <module>deltalake-scanner</module>
         <module>hudi-scanner</module>
         <module>java-common</module>
         <module>java-udf</module>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 34f78581120..2d7d2b318c4 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -763,6 +763,13 @@ under the License.
             <groupId>org.immutables</groupId>
             <artifactId>value</artifactId>
         </dependency>
+
+        <!--  for delta lake catalog-->
+        <dependency>
+            <groupId>io.delta</groupId>
+            <artifactId>delta-core_2.12</artifactId>
+            <version>0.8.0</version>
+        </dependency>
     </dependencies>
     <repositories>
         <!-- for huawei obs sdk -->
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b2f59fdb55d..0d6f2f9c8c8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -45,6 +45,7 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.catalog.external.IcebergExternalTable;
@@ -169,6 +170,7 @@ import org.apache.doris.planner.TableFunctionNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.MaxComputeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
 import org.apache.doris.planner.external.hudi.HudiScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.jdbc.JdbcScanNode;
@@ -488,6 +490,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else if (table instanceof PaimonExternalTable) {
             scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+        } else if (table instanceof DeltaLakeExternalTable) {
+            scanNode = new DeltaLakeScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else if (table instanceof MaxComputeExternalTable) {
             scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index e0bdfdce4de..add28b6df9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -70,6 +70,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.planner.external.FileQueryScanNode;
 import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.MaxComputeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
 import org.apache.doris.planner.external.hudi.HudiScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.jdbc.JdbcScanNode;
@@ -2036,6 +2037,9 @@ public class SingleNodePlanner {
             case PAIMON_EXTERNAL_TABLE:
                 scanNode = new PaimonScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
                 break;
+            case DELTALAKE_EXTERNAL_TABLE:
+                scanNode = new DeltaLakeScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                break;
             case MAX_COMPUTE_EXTERNAL_TABLE:
                 // TODO: support max compute scan node
                 scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "MCScanNode",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 7e9078df3d9..c4ca39291e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -40,6 +40,8 @@ import org.apache.doris.datasource.hive.AcidInfo;
 import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeSplit;
 import org.apache.doris.planner.external.hudi.HudiScanNode;
 import org.apache.doris.planner.external.hudi.HudiSplit;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
@@ -362,6 +364,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) 
fileSplit);
             } else if (fileSplit instanceof HudiSplit) {
                 HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
+            } else if (fileSplit instanceof DeltaLakeSplit) {
+                DeltaLakeScanNode.setDeltaLakeParams(rangeDesc, 
(DeltaLakeSplit) fileSplit);
             }
 
             
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 891e138db6b..e7127efcf8e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -22,6 +22,7 @@ public enum TableFormatType {
     ICEBERG("iceberg"),
     HUDI("hudi"),
     PAIMON("paimon"),
+    DELTALAKE("deltalake"),
     TRANSACTIONAL_HIVE("transactional_hive");
 
     private final String tableFormatType;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
new file mode 100644
index 00000000000..9c635308a5a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.deltalake;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TDeltaLakeFileDesc;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.DeltaScan;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.data.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DeltaLakeScanNode extends HiveScanNode {
+    private static DeltaLakeSource source = null;
+    private static final Base64.Encoder BASE64_ENCODER =
+            java.util.Base64.getUrlEncoder().withoutPadding();
+
+    public DeltaLakeScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "DELTALAKE_SCAN_NODE", 
StatisticalType.DELTALAKE_SCAN_NODE, needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        DeltaLakeExternalTable table = (DeltaLakeExternalTable) 
desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                    String.format("Querying external view '%s.%s' is not 
supported", table.getDbName(),
+                            table.getName()));
+        }
+        computeColumnFilter();
+        initBackendPolicy();
+        source = new DeltaLakeSource(table, desc, columnNameToRange);
+        Preconditions.checkNotNull(source);
+        initSchemaParams();
+    }
+
+    public static void setDeltaLakeParams(TFileRangeDesc rangeDesc, 
DeltaLakeSplit deltaLakeSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        
tableFormatFileDesc.setTableFormatType(deltaLakeSplit.getTableFormatType().value());
+        TDeltaLakeFileDesc fileDesc = new TDeltaLakeFileDesc();
+        fileDesc.setDbName(source.getDeltalakeExtTable().getDbName());
+        fileDesc.setTableName(source.getDeltalakeExtTable().getName());
+        fileDesc.setPath(deltaLakeSplit.getPath().toString());
+        fileDesc.setConf(encodeConfToString(
+                
HiveMetaStoreClientHelper.getConfiguration(source.getDeltalakeExtTable())));
+        
fileDesc.setDeltaLakeColumnNames(source.getDesc().getSlots().stream().map(slot 
-> slot.getColumn().getName())
+                .collect(Collectors.joining(",")));
+        tableFormatFileDesc.setDeltaLakeParams(fileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        List<Split> splits = new ArrayList<>();
+        DeltaLakeExternalTable table = source.getDeltalakeExtTable();
+        String path = table.getRemoteTable().getSd().getLocation();
+        Configuration conf = HiveMetaStoreClientHelper.getConfiguration(table);
+        Snapshot snapshot = DeltaLog.forTable(conf, path).snapshot();
+        List<String> partitionColumns = 
snapshot.getMetadata().getPartitionColumns();
+        DeltaScan deltaScan = snapshot.scan();
+        CloseableIterator<AddFile> allFiles = deltaScan.getFiles();
+        while (allFiles.hasNext()) {
+            AddFile addFile = allFiles.next();
+            DeltaLakeSplit deltaLakeSplit = getDeltaLakeSplit(addFile, 
partitionColumns, conf);
+            splits.add(deltaLakeSplit);
+        }
+        try {
+            allFiles.close();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return splits;
+    }
+
+    @NotNull
+    private static DeltaLakeSplit getDeltaLakeSplit(AddFile addFile, 
List<String> partitionColumns,
+                                                    Configuration 
configuration) {
+        List<String> partitionValues = new ArrayList<>();
+        Map<String, String> map = addFile.getPartitionValues();
+        for (String key : partitionColumns) {
+            if (map.containsKey(key)) {
+                partitionValues.add(map.get(key));
+            }
+        }
+        DeltaLakeSplit deltaLakeSplit = new DeltaLakeSplit(new 
Path(addFile.getPath()),
+                0, addFile.getSize(), addFile.getSize(), null, 
partitionValues, configuration);
+        deltaLakeSplit.setTableFormatType(TableFormatType.DELTALAKE);
+        return deltaLakeSplit;
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() {
+        return TFileFormatType.FORMAT_JNI;
+    }
+
+    public static String encodeConfToString(Configuration conf) {
+        try {
+            ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+            ObjectOutputStream objectOutputStream;
+            objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
+            conf.write(objectOutputStream);
+            objectOutputStream.flush();
+            objectOutputStream.close();
+            byte[] bytes = byteArrayOutputStream.toByteArray();
+            return new String(BASE64_ENCODER.encode(bytes), 
java.nio.charset.StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
similarity index 52%
copy from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
index 891e138db6b..f6cbf6b837e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.deltalake;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon"),
-    TRANSACTIONAL_HIVE("transactional_hive");
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
+import org.apache.doris.planner.ColumnRange;
 
-    private final String tableFormatType;
+import java.util.Map;
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
+public class DeltaLakeSource  {
+    private final DeltaLakeExternalTable deltalakeExtTable;
+    private final TupleDescriptor desc;
+
+    public DeltaLakeSource(DeltaLakeExternalTable table, TupleDescriptor desc,
+            Map<String, ColumnRange> columnNameToRange) {
+        this.deltalakeExtTable = table;
+        this.desc = desc;
+    }
+
+    public DeltaLakeExternalTable getDeltalakeExtTable() {
+        return deltalakeExtTable;
     }
 
-    public String value() {
-        return tableFormatType;
+    public TupleDescriptor getDesc() {
+        return desc;
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
similarity index 59%
copy from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
index 891e138db6b..0e287afa285 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
@@ -15,22 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.deltalake;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon"),
-    TRANSACTIONAL_HIVE("transactional_hive");
+import org.apache.doris.planner.external.FileSplit;
 
-    private final String tableFormatType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
+import java.util.List;
+
+public class DeltaLakeSplit extends FileSplit {
+    private Configuration conf;
 
-    public String value() {
-        return tableFormatType;
+    public DeltaLakeSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
+            List<String> partitionValues, Configuration configuration) {
+        super(file, start, length, fileLength, hosts, partitionValues);
+        conf = configuration;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index ba22e067a8b..53b1ca8eaa8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -52,6 +52,7 @@ public class DeriveFactory {
             case HIVE_SCAN_NODE:
             case ICEBERG_SCAN_NODE:
             case PAIMON_SCAN_NODE:
+            case DELTALAKE_SCAN_NODE:
             case INTERSECT_NODE:
             case SCHEMA_SCAN_NODE:
             case STREAM_LOAD_SCAN_NODE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 7fe9b03cbcf..33e0aecd57b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -34,6 +34,7 @@ public enum StatisticalType {
     ICEBERG_SCAN_NODE,
     PAIMON_SCAN_NODE,
     HUDI_SCAN_NODE,
+    DELTALAKE_SCAN_NODE,
     TVF_SCAN_NODE,
     INTERSECT_NODE,
     LOAD_SCAN_NODE,
diff --git a/fe/pom.xml b/fe/pom.xml
index 0e8193bc533..4f6665b1607 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -1326,7 +1326,6 @@ under the License.
                 <groupId>org.scala-lang</groupId>
                 <artifactId>scala-library</artifactId>
                 <version>${scala.version}</version>
-                <scope>provided</scope>
             </dependency>
             <dependency>
                 <groupId>com.esotericsoftware</groupId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ff587ddc68c..0ea9c72269e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -308,6 +308,13 @@ struct TPaimonFileDesc {
     6: optional map<string, string> paimon_options
 }
 
+struct TDeltaLakeFileDesc {
+    1: optional string path
+    2: optional string delta_lake_column_names
+    3: optional string db_name
+    4: optional string table_name
+    5: optional string conf
+}
 
 struct THudiFileDesc {
     1: optional string instant_time;
@@ -338,6 +345,7 @@ struct TTableFormatFileDesc {
     3: optional THudiFileDesc hudi_params
     4: optional TPaimonFileDesc paimon_params
     5: optional TTransactionalHiveDesc transactional_hive_params
+    6: optional TDeltaLakeFileDesc delta_lake_params
 }
 
 enum TTextSerdeType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to