This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-deltalake
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-deltalake by this push:
new 53f4fd894b6 [feature-wip](catalog) support deltalake catalog (#25020)
53f4fd894b6 is described below
commit 53f4fd894b61a8bd7fceefd74d89f75d011bc497
Author: Peilin Sun <[email protected]>
AuthorDate: Sat Oct 28 22:25:27 2023 +0800
[feature-wip](catalog) support deltalake catalog (#25020)
---
be/src/vec/exec/format/table/deltalake_reader.cpp | 81 +++++++++++
be/src/vec/exec/format/table/deltalake_reader.h | 70 ++++++++++
env.sh | 0
fe/be-java-extensions/deltalake-scanner/pom.xml | 124 +++++++++++++++++
.../doris/deltalake/DeltaLakeColumnValue.java | 141 +++++++++++++++++++
.../doris/deltalake/DeltaLakeJniScanner.java | 104 ++++++++++++++
.../doris/deltalake/DeltaLakeScannerUtils.java | 47 +++++++
.../src/main/resources/package.xml | 41 ++++++
fe/be-java-extensions/pom.xml | 1 +
fe/fe-core/pom.xml | 7 +
.../glue/translator/PhysicalPlanTranslator.java | 4 +
.../apache/doris/planner/SingleNodePlanner.java | 4 +
.../doris/planner/external/FileQueryScanNode.java | 4 +
.../doris/planner/external/TableFormatType.java | 1 +
.../external/deltalake/DeltaLakeScanNode.java | 153 +++++++++++++++++++++
.../DeltaLakeSource.java} | 32 +++--
.../DeltaLakeSplit.java} | 25 ++--
.../org/apache/doris/statistics/DeriveFactory.java | 1 +
.../apache/doris/statistics/StatisticalType.java | 1 +
fe/pom.xml | 1 -
gensrc/thrift/PlanNodes.thrift | 8 ++
21 files changed, 824 insertions(+), 26 deletions(-)
diff --git a/be/src/vec/exec/format/table/deltalake_reader.cpp
b/be/src/vec/exec/format/table/deltalake_reader.cpp
new file mode 100644
index 00000000000..f2698eaafd8
--- /dev/null
+++ b/be/src/vec/exec/format/table/deltalake_reader.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "deltalake_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+}
+} // namespace doris
+
+namespace doris::vectorized {
+
+DeltaLakeJniReader::DeltaLakeJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state, RuntimeProfile*
profile,
+ const TFileRangeDesc& range)
+ : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
+ std::vector<std::string> column_names;
+ for (auto& desc : _file_slot_descs) {
+ std::string field = desc->col_name();
+ column_names.emplace_back(field);
+ }
+ std::map<String, String> params;
+ params["db_name"] = range.table_format_params.delta_lake_params.db_name;
+ params["table_name"] =
range.table_format_params.delta_lake_params.table_name;
+ params["path"] = range.table_format_params.delta_lake_params.path;
+ params["delta_lake_column_names"] =
+
range.table_format_params.delta_lake_params.delta_lake_column_names;
+ params["conf"] = range.table_format_params.delta_lake_params.conf;
+ _jni_connector = std::make_unique<JniConnector>(
+ "org/apache/doris/deltalake/DeltaLakeJniScanner", params,
column_names);
+}
+
+Status DeltaLakeJniReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
+ RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+ if (*eof) {
+ RETURN_IF_ERROR(_jni_connector->close());
+ }
+ return Status::OK();
+}
+
+Status DeltaLakeJniReader::get_columns(
+ std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) {
+ for (auto& desc : _file_slot_descs) {
+ name_to_type->emplace(desc->col_name(), desc->type());
+ }
+ return Status::OK();
+}
+
+Status DeltaLakeJniReader::init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ _colname_to_value_range = colname_to_value_range;
+ RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/deltalake_reader.h
b/be/src/vec/exec/format/table/deltalake_reader.h
new file mode 100644
index 00000000000..8e6668d1404
--- /dev/null
+++ b/be/src/vec/exec/format/table/deltalake_reader.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+}
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+class DeltaLakeJniReader : public GenericReader {
+ ENABLE_FACTORY_CREATOR(DeltaLakeJniReader);
+
+public:
+ DeltaLakeJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state,
+ RuntimeProfile* profile, const TFileRangeDesc& range);
+
+ ~DeltaLakeJniReader() override = default;
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+
+private:
+ const std::vector<SlotDescriptor*>& _file_slot_descs;
+ RuntimeState* _state;
+ RuntimeProfile* _profile;
+ std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/env.sh b/env.sh
old mode 100755
new mode 100644
diff --git a/fe/be-java-extensions/deltalake-scanner/pom.xml
b/fe/be-java-extensions/deltalake-scanner/pom.xml
new file mode 100644
index 00000000000..0870c0199d1
--- /dev/null
+++ b/fe/be-java-extensions/deltalake-scanner/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>deltalake-scanner</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>fe-common</artifactId>
+ <groupId>org.apache.doris</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.3</version>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.presto.hive</groupId>
+ <artifactId>hive-apache</artifactId>
+ <version>${presto.hive.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-standalone_2.12</artifactId>
+ <version>3.0.0rc1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>fe-core</artifactId>
+ <version>1.2-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <finalName>deltalake-scanner</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/resources/package.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
new file mode 100644
index 00000000000..dcf759bbc84
--- /dev/null
+++
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeColumnValue.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import io.delta.standalone.data.RowRecord;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+public class DeltaLakeColumnValue implements ColumnValue {
+
+ private String fieldName;
+ private RowRecord record;
+
+ public DeltaLakeColumnValue() {
+
+ }
+
+ public void setfieldName(String name) {
+ this.fieldName = name;
+ }
+
+ public void setRecord(RowRecord record) {
+ this.record = record;
+ }
+
+ @Override
+ public boolean canGetStringAsBytes() {
+ return true;
+ }
+
+ @Override
+ public boolean isNull() {
+ return record.isNullAt(fieldName);
+ }
+
+ @Override
+ public boolean getBoolean() {
+ return record.getBoolean(fieldName);
+ }
+
+ @Override
+ public byte getByte() {
+ return record.getByte(fieldName);
+ }
+
+ @Override
+ public short getShort() {
+ return record.getShort(fieldName);
+ }
+
+ @Override
+ public int getInt() {
+ return record.getInt(fieldName);
+ }
+
+ @Override
+ public float getFloat() {
+ return record.getFloat(fieldName);
+ }
+
+ @Override
+ public long getLong() {
+ return record.getLong(fieldName);
+ }
+
+ @Override
+ public double getDouble() {
+ return record.getDouble(fieldName);
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ return BigInteger.valueOf(record.getInt(fieldName));
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ return record.getBigDecimal(fieldName);
+ }
+
+ @Override
+ public String getString() {
+ return record.getString(fieldName);
+ }
+
+ @Override
+ public byte[] getStringAsBytes() {
+ return record.getString(fieldName).getBytes();
+ }
+
+ @Override
+ public LocalDate getDate() {
+ return record.getDate(fieldName).toLocalDate();
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ return record.getTimestamp(fieldName).toLocalDateTime();
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return record.getBinary(fieldName);
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
+
+ }
+}
diff --git
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
new file mode 100644
index 00000000000..952593377e5
--- /dev/null
+++
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeJniScanner.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.data.RowRecord;
+import io.delta.standalone.types.DataType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static
org.apache.doris.deltalake.DeltaLakeScannerUtils.decodeStringToConf;
+
+public class DeltaLakeJniScanner extends JniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(DeltaLakeJniScanner.class);
+ private final String dbName;
+ private final String tblName;
+ private final String path;
+ private CloseableIterator<RowRecord> iter = null;
+ private final Configuration conf;
+ private final DeltaLakeColumnValue columnValue = new
DeltaLakeColumnValue();
+
+ public DeltaLakeJniScanner(int batchSize, Map<String, String> params) {
+ LOG.debug("params:{}", params);
+ conf =decodeStringToConf( params.get("conf"));
+ dbName = params.get("db_name");
+ tblName = params.get("table_name");
+ path = params.get("path");
+ super.batchSize = batchSize;
+ super.fields = params.get("deltalake_column_names").split(",");
+ }
+
+ @Override
+ public void open() throws IOException {
+ initIter();
+ parseRequiredTypes();
+ }
+
+ @Override
+ public void close() throws IOException {
+ iter.close();
+ }
+
+ @Override
+ protected int getNext() {
+ int rows = 0;
+ RowRecord row;
+ while (iter.hasNext()) {
+ row = iter.next();
+ columnValue.setRecord(row);
+ for (int i = 0; i < types.length; i++) {
+ columnValue.setfieldName(types[i].getName());
+ appendData(i, columnValue);
+ }
+ rows++;
+ }
+ return rows;
+ }
+
+ private void initIter() {
+ DeltaLog deltaLog = DeltaLog.forTable(conf, path);
+ iter = deltaLog.snapshot().open();
+ }
+
+ private void parseRequiredTypes() {
+ DeltaLog deltaLog = DeltaLog.forTable(conf, path);
+ StructType schema = deltaLog.snapshot().getMetadata().getSchema();
+ assert schema != null;
+ ColumnType[] columnTypes = new ColumnType[schema.length()];
+ int i = 0;
+ for (StructField field : schema.getFields()) {
+ String columnName = field.getName();
+ DataType dataType = field.getDataType();
+ columnTypes[i] = ColumnType.parseType(columnName,
dataType.toString());
+ i++;
+ }
+ super.types = columnTypes;
+ }
+}
diff --git
a/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
new file mode 100644
index 00000000000..9b15575f2d1
--- /dev/null
+++
b/fe/be-java-extensions/deltalake-scanner/src/main/java/org/apache/doris/deltalake/DeltaLakeScannerUtils.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.deltalake;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+
+
+public class DeltaLakeScannerUtils {
+ private static final Base64.Decoder BASE64_DECODER =
Base64.getUrlDecoder();
+
+ public static Configuration decodeStringToConf(String encodedStr) {
+ final byte[] bytes =
BASE64_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ try {
+ Configuration conf = new Configuration();
+ ByteArrayInputStream byteArrayOutputStream = new
ByteArrayInputStream(bytes);
+ ObjectInputStream objectOutputStream = new
ObjectInputStream(byteArrayOutputStream);
+ conf.readFields(objectOutputStream);
+ objectOutputStream.close();
+ return conf;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git
a/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/deltalake-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index af8584d1776..59f59f1a80f 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,6 +21,7 @@ under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
+ <module>deltalake-scanner</module>
<module>hudi-scanner</module>
<module>java-common</module>
<module>java-udf</module>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 34f78581120..2d7d2b318c4 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -763,6 +763,13 @@ under the License.
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
</dependency>
+
+ <!-- for delta lake catalog-->
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-core_2.12</artifactId>
+ <version>0.8.0</version>
+ </dependency>
</dependencies>
<repositories>
<!-- for huawei obs sdk -->
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b2f59fdb55d..0d6f2f9c8c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -45,6 +45,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.IcebergExternalTable;
@@ -169,6 +170,7 @@ import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.MaxComputeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
@@ -488,6 +490,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ } else if (table instanceof DeltaLakeExternalTable) {
+ scanNode = new DeltaLakeScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index e0bdfdce4de..add28b6df9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -70,6 +70,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.planner.external.MaxComputeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
@@ -2036,6 +2037,9 @@ public class SingleNodePlanner {
case PAIMON_EXTERNAL_TABLE:
scanNode = new PaimonScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
break;
+ case DELTALAKE_EXTERNAL_TABLE:
+ scanNode = new DeltaLakeScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), "MCScanNode",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 7e9078df3d9..c4ca39291e6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -40,6 +40,8 @@ import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.deltalake.DeltaLakeScanNode;
+import org.apache.doris.planner.external.deltalake.DeltaLakeSplit;
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.hudi.HudiSplit;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
@@ -362,6 +364,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit)
fileSplit);
} else if (fileSplit instanceof HudiSplit) {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
+ } else if (fileSplit instanceof DeltaLakeSplit) {
+ DeltaLakeScanNode.setDeltaLakeParams(rangeDesc,
(DeltaLakeSplit) fileSplit);
}
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 891e138db6b..e7127efcf8e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -22,6 +22,7 @@ public enum TableFormatType {
ICEBERG("iceberg"),
HUDI("hudi"),
PAIMON("paimon"),
+ DELTALAKE("deltalake"),
TRANSACTIONAL_HIVE("transactional_hive");
private final String tableFormatType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
new file mode 100644
index 00000000000..9c635308a5a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeScanNode.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.deltalake;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TDeltaLakeFileDesc;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.DeltaScan;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.data.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DeltaLakeScanNode extends HiveScanNode {
+ private static DeltaLakeSource source = null;
+ private static final Base64.Encoder BASE64_ENCODER =
+ java.util.Base64.getUrlEncoder().withoutPadding();
+
+ public DeltaLakeScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "DELTALAKE_SCAN_NODE",
StatisticalType.DELTALAKE_SCAN_NODE, needCheckColumnPriv);
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ DeltaLakeExternalTable table = (DeltaLakeExternalTable)
desc.getTable();
+ if (table.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '%s.%s' is not
supported", table.getDbName(),
+ table.getName()));
+ }
+ computeColumnFilter();
+ initBackendPolicy();
+ source = new DeltaLakeSource(table, desc, columnNameToRange);
+ Preconditions.checkNotNull(source);
+ initSchemaParams();
+ }
+
+ public static void setDeltaLakeParams(TFileRangeDesc rangeDesc,
DeltaLakeSplit deltaLakeSplit) {
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(deltaLakeSplit.getTableFormatType().value());
+ TDeltaLakeFileDesc fileDesc = new TDeltaLakeFileDesc();
+ fileDesc.setDbName(source.getDeltalakeExtTable().getDbName());
+ fileDesc.setTableName(source.getDeltalakeExtTable().getName());
+ fileDesc.setPath(deltaLakeSplit.getPath().toString());
+ fileDesc.setConf(encodeConfToString(
+
HiveMetaStoreClientHelper.getConfiguration(source.getDeltalakeExtTable())));
+
fileDesc.setDeltaLakeColumnNames(source.getDesc().getSlots().stream().map(slot
-> slot.getColumn().getName())
+ .collect(Collectors.joining(",")));
+ tableFormatFileDesc.setDeltaLakeParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
+ @Override
+ public List<Split> getSplits() throws UserException {
+ List<Split> splits = new ArrayList<>();
+ DeltaLakeExternalTable table = source.getDeltalakeExtTable();
+ String path = table.getRemoteTable().getSd().getLocation();
+ Configuration conf = HiveMetaStoreClientHelper.getConfiguration(table);
+ Snapshot snapshot = DeltaLog.forTable(conf, path).snapshot();
+ List<String> partitionColumns =
snapshot.getMetadata().getPartitionColumns();
+ DeltaScan deltaScan = snapshot.scan();
+ CloseableIterator<AddFile> allFiles = deltaScan.getFiles();
+ while (allFiles.hasNext()) {
+ AddFile addFile = allFiles.next();
+ DeltaLakeSplit deltaLakeSplit = getDeltaLakeSplit(addFile,
partitionColumns, conf);
+ splits.add(deltaLakeSplit);
+ }
+ try {
+ allFiles.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return splits;
+ }
+
+ @NotNull
+ private static DeltaLakeSplit getDeltaLakeSplit(AddFile addFile,
List<String> partitionColumns,
+ Configuration
configuration) {
+ List<String> partitionValues = new ArrayList<>();
+ Map<String, String> map = addFile.getPartitionValues();
+ for (String key : partitionColumns) {
+ if (map.containsKey(key)) {
+ partitionValues.add(map.get(key));
+ }
+ }
+ DeltaLakeSplit deltaLakeSplit = new DeltaLakeSplit(new
Path(addFile.getPath()),
+ 0, addFile.getSize(), addFile.getSize(), null,
partitionValues, configuration);
+ deltaLakeSplit.setTableFormatType(TableFormatType.DELTALAKE);
+ return deltaLakeSplit;
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() {
+ return TFileFormatType.FORMAT_JNI;
+ }
+
+ public static String encodeConfToString(Configuration conf) {
+ try {
+ ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ ObjectOutputStream objectOutputStream;
+ objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
+ conf.write(objectOutputStream);
+ objectOutputStream.flush();
+ objectOutputStream.close();
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ return new String(BASE64_ENCODER.encode(bytes),
java.nio.charset.StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
similarity index 52%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
index 891e138db6b..f6cbf6b837e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSource.java
@@ -15,22 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.deltalake;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- TRANSACTIONAL_HIVE("transactional_hive");
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.external.DeltaLakeExternalTable;
+import org.apache.doris.planner.ColumnRange;
- private final String tableFormatType;
+import java.util.Map;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
+public class DeltaLakeSource {
+ private final DeltaLakeExternalTable deltalakeExtTable;
+ private final TupleDescriptor desc;
+
+ public DeltaLakeSource(DeltaLakeExternalTable table, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ this.deltalakeExtTable = table;
+ this.desc = desc;
+ }
+
+ public DeltaLakeExternalTable getDeltalakeExtTable() {
+ return deltalakeExtTable;
}
- public String value() {
- return tableFormatType;
+ public TupleDescriptor getDesc() {
+ return desc;
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
similarity index 59%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
index 891e138db6b..0e287afa285 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/deltalake/DeltaLakeSplit.java
@@ -15,22 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner.external.deltalake;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- TRANSACTIONAL_HIVE("transactional_hive");
+import org.apache.doris.planner.external.FileSplit;
- private final String tableFormatType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
+import java.util.List;
+
+public class DeltaLakeSplit extends FileSplit {
+ private Configuration conf;
- public String value() {
- return tableFormatType;
+ public DeltaLakeSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ List<String> partitionValues, Configuration configuration) {
+ super(file, start, length, fileLength, hosts, partitionValues);
+ conf = configuration;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index ba22e067a8b..53b1ca8eaa8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -52,6 +52,7 @@ public class DeriveFactory {
case HIVE_SCAN_NODE:
case ICEBERG_SCAN_NODE:
case PAIMON_SCAN_NODE:
+ case DELTALAKE_SCAN_NODE:
case INTERSECT_NODE:
case SCHEMA_SCAN_NODE:
case STREAM_LOAD_SCAN_NODE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 7fe9b03cbcf..33e0aecd57b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -34,6 +34,7 @@ public enum StatisticalType {
ICEBERG_SCAN_NODE,
PAIMON_SCAN_NODE,
HUDI_SCAN_NODE,
+ DELTALAKE_SCAN_NODE,
TVF_SCAN_NODE,
INTERSECT_NODE,
LOAD_SCAN_NODE,
diff --git a/fe/pom.xml b/fe/pom.xml
index 0e8193bc533..4f6665b1607 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -1326,7 +1326,6 @@ under the License.
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ff587ddc68c..0ea9c72269e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -308,6 +308,13 @@ struct TPaimonFileDesc {
6: optional map<string, string> paimon_options
}
+struct TDeltaLakeFileDesc {
+ 1: optional string path
+ 2: optional string delta_lake_column_names
+ 3: optional string db_name
+ 4: optional string table_name
+ 5: optional string conf
+}
struct THudiFileDesc {
1: optional string instant_time;
@@ -338,6 +345,7 @@ struct TTableFormatFileDesc {
3: optional THudiFileDesc hudi_params
4: optional TPaimonFileDesc paimon_params
5: optional TTransactionalHiveDesc transactional_hive_params
+ 6: optional TDeltaLakeFileDesc delta_lake_params
}
enum TTextSerdeType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]