This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 60278229f47 branch-3.1: [feat](iceberg) Support read iceberg system
tables and upgrade paimon and iceberg #51634 #51190 (#52248)
60278229f47 is described below
commit 60278229f470ef915d89a306240af3995bcdab97
Author: Socrates <[email protected]>
AuthorDate: Mon Jun 30 14:03:20 2025 +0800
branch-3.1: [feat](iceberg) Support read iceberg system tables and upgrade
paimon and iceberg #51634 #51190 (#52248)
support iceberg system table : #51190
upgrade paimon and iceberg: #51634
---------
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
be/src/vec/exec/format/jni_reader.cpp | 16 -
be/src/vec/exec/format/jni_reader.h | 17 +-
be/src/vec/exec/format/table/hudi_jni_reader.cpp | 15 +-
be/src/vec/exec/format/table/hudi_jni_reader.h | 5 -
.../format/table/iceberg_sys_table_jni_reader.cpp | 60 ++++
...jni_reader.h => iceberg_sys_table_jni_reader.h} | 30 +-
.../vec/exec/format/table/lakesoul_jni_reader.cpp | 20 +-
be/src/vec/exec/format/table/lakesoul_jni_reader.h | 17 +-
.../exec/format/table/max_compute_jni_reader.cpp | 13 -
.../vec/exec/format/table/max_compute_jni_reader.h | 5 -
be/src/vec/exec/format/table/paimon_jni_reader.cpp | 8 -
be/src/vec/exec/format/table/paimon_jni_reader.h | 3 -
.../format/table/trino_connector_jni_reader.cpp | 13 -
.../exec/format/table/trino_connector_jni_reader.h | 5 -
be/src/vec/exec/scan/vmeta_scanner.cpp | 26 +-
be/src/vec/exec/scan/vmeta_scanner.h | 7 +-
build.sh | 2 +
.../create_preinstalled_scripts/iceberg/run13.sql | 28 ++
.../iceberg-metadata-scanner/pom.xml | 86 +++++
.../doris/iceberg/IcebergSysTableColumnValue.java | 173 +++++++++
.../doris/iceberg/IcebergSysTableJniScanner.java | 138 ++++++++
.../src/main/resources/package.xml | 41 +++
fe/be-java-extensions/pom.xml | 1 +
fe/be-java-extensions/preload-extensions/pom.xml | 31 ++
.../java/org/apache/doris/catalog/StructType.java | 4 +
fe/fe-core/pom.xml | 2 +
.../main/java/org/apache/doris/catalog/Column.java | 4 +
.../datasource/iceberg/IcebergMetadataCache.java | 13 -
.../doris/datasource/iceberg/IcebergUtils.java | 22 +-
.../datasource/paimon/PaimonMetadataCache.java | 8 +-
...SnapshotsSysTable.java => IcebergSysTable.java} | 30 +-
.../datasource/systable/SupportedSysTables.java | 5 +-
.../datasource/tvf/source/MetadataScanNode.java | 44 ++-
.../expressions/functions/table/IcebergMeta.java | 8 +-
.../tablefunction/BackendsTableValuedFunction.java | 5 +-
.../tablefunction/CatalogsTableValuedFunction.java | 17 +-
.../FrontendsDisksTableValuedFunction.java | 5 +-
.../FrontendsTableValuedFunction.java | 5 +-
.../tablefunction/HudiTableValuedFunction.java | 4 +-
.../tablefunction/IcebergTableValuedFunction.java | 149 ++++----
.../tablefunction/JobsTableValuedFunction.java | 5 +-
.../doris/tablefunction/MetadataGenerator.java | 59 ---
.../tablefunction/MetadataTableValuedFunction.java | 6 +-
.../tablefunction/MvInfosTableValuedFunction.java | 5 +-
.../PartitionValuesTableValuedFunction.java | 4 +-
.../PartitionsTableValuedFunction.java | 5 +-
.../doris/tablefunction/TableValuedFunctionIf.java | 2 +-
.../tablefunction/TasksTableValuedFunction.java | 5 +-
.../iceberg/IcebergExternalTableTest.java | 118 +++++-
.../iceberg/source/IcebergScanNodeTest.java | 181 ----------
fe/pom.xml | 9 +-
gensrc/thrift/PlanNodes.thrift | 6 +-
gensrc/thrift/Types.thrift | 4 -
.../iceberg/test_iceberg_sys_table.out | Bin 1028 -> 95292 bytes
.../iceberg/test_iceberg_sys_table.groovy | 394 ++++++++++++++++-----
55 files changed, 1246 insertions(+), 642 deletions(-)
diff --git a/be/src/vec/exec/format/jni_reader.cpp
b/be/src/vec/exec/format/jni_reader.cpp
index 1f0f5818446..40addd74de4 100644
--- a/be/src/vec/exec/format/jni_reader.cpp
+++ b/be/src/vec/exec/format/jni_reader.cpp
@@ -62,22 +62,6 @@ MockJniReader::MockJniReader(const
std::vector<SlotDescriptor*>& file_slot_descs
params, column_names);
}
-Status MockJniReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
- RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
- if (*eof) {
- RETURN_IF_ERROR(_jni_connector->close());
- }
- return Status::OK();
-}
-
-Status MockJniReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>*
missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status MockJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/jni_reader.h
b/be/src/vec/exec/format/jni_reader.h
index 487e1ee1e6b..de090b9ced4 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -50,6 +50,18 @@ public:
~JniReader() override = default;
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override
{
+ for (const auto& desc : _file_slot_descs) {
+ name_to_type->emplace(desc->col_name(), desc->type());
+ }
+ return Status::OK();
+ }
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override
{
+ return _jni_connector->get_next_block(block, read_rows, eof);
+ }
+
Status close() override {
if (_jni_connector) {
return _jni_connector->close();
@@ -83,11 +95,6 @@ public:
~MockJniReader() override = default;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index f73f9a29225..eb1b1a16e24 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -61,7 +61,8 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams&
scan_params,
{"required_fields", join(required_fields, ",")},
{"instant_time", _hudi_params.instant_time},
{"serde", _hudi_params.serde},
- {"input_format", _hudi_params.input_format}};
+ {"input_format", _hudi_params.input_format},
+ {"time_zone", state->timezone_obj().name()}};
// Use compatible hadoop client to read data
for (const auto& kv : _scan_params.properties) {
@@ -76,18 +77,6 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams&
scan_params,
params, required_fields);
}
-Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
- return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status HudiJniReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>*
missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status HudiJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index 6fa4b0c836f..f29cd3c2bbb 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -52,11 +52,6 @@ public:
~HudiJniReader() override = default;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
new file mode 100644
index 00000000000..2d3519b7dd2
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "iceberg_sys_table_jni_reader.h"
+
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
+
+IcebergSysTableJniReader::IcebergSysTableJniReader(
+ const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state,
+ RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
+ : JniReader(file_slot_descs, state, profile),
_range_params(range_params) {}
+
+Status IcebergSysTableJniReader::init_reader(
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ std::vector<std::string> required_fields;
+ std::vector<std::string> required_types;
+ for (const auto& desc : _file_slot_descs) {
+ required_fields.emplace_back(desc->col_name());
+ required_types.emplace_back(JniConnector::get_jni_type(desc->type()));
+ }
+ std::map<std::string, std::string> params;
+ params["serialized_task"] = _range_params.serialized_task;
+ params["required_fields"] = join(required_fields, ",");
+ params["required_types"] = join(required_types, "#");
+ params["time_zone"] = _state->timezone_obj().name();
+ for (const auto& kv : _range_params.hadoop_props) {
+ params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+ }
+ _jni_connector =
+
std::make_unique<JniConnector>("org/apache/doris/iceberg/IcebergSysTableJniScanner",
+ std::move(params), required_fields);
+ if (_jni_connector == nullptr) {
+ return Status::InternalError("JniConnector failed to initialize");
+ }
+ RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
similarity index 62%
copy from be/src/vec/exec/format/table/trino_connector_jni_reader.h
copy to be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
index cb0461bb4a6..ed867e46abe 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
@@ -17,12 +17,12 @@
#pragma once
-#include <stddef.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
-#include <memory>
+#include <cstddef>
#include <string>
#include <unordered_map>
-#include <unordered_set>
#include <vector>
#include "common/status.h"
@@ -36,31 +36,27 @@ class SlotDescriptor;
namespace vectorized {
class Block;
} // namespace vectorized
-struct TypeDescriptor;
} // namespace doris
namespace doris::vectorized {
+#include "common/compile_check_begin.h"
-class TrinoConnectorJniReader : public JniReader {
- ENABLE_FACTORY_CREATOR(TrinoConnectorJniReader);
+class IcebergSysTableJniReader : public JniReader {
+ ENABLE_FACTORY_CREATOR(IcebergSysTableJniReader);
public:
- static const std::string TRINO_CONNECTOR_OPTION_PREFIX;
- TrinoConnectorJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
- RuntimeState* state, RuntimeProfile* profile,
- const TFileRangeDesc& range);
+ IcebergSysTableJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state, RuntimeProfile* profile,
+ const TIcebergMetadataParams& range_params);
- ~TrinoConnectorJniReader() override = default;
-
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
+ ~IcebergSysTableJniReader() override = default;
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
- Status _set_spi_plugins_dir();
+ const TIcebergMetadataParams& _range_params;
};
+
+#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
index 2d03e95c215..e17e451c08a 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
@@ -23,8 +23,8 @@
#include "common/logging.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
-#include "runtime/types.h"
#include "vec/core/types.h"
+#include "vec/exec/format/jni_reader.h"
namespace doris {
class RuntimeProfile;
@@ -39,10 +39,7 @@ namespace doris::vectorized {
LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
const std::vector<SlotDescriptor*>&
file_slot_descs,
RuntimeState* state, RuntimeProfile*
profile)
- : _lakesoul_params(lakesoul_params),
- _file_slot_descs(file_slot_descs),
- _state(state),
- _profile(profile) {
+ : JniReader(file_slot_descs, state, profile),
_lakesoul_params(lakesoul_params) {
std::vector<std::string> required_fields;
for (auto& desc : _file_slot_descs) {
required_fields.emplace_back(desc->col_name());
@@ -61,21 +58,8 @@ LakeSoulJniReader::LakeSoulJniReader(const
TLakeSoulFileDesc& lakesoul_params,
params, required_fields);
}
-Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
- return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status LakeSoulJniReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>*
missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status LakeSoulJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
- _colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
}
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
index fa6aa062d9f..723f2554bc7 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
@@ -17,16 +17,13 @@
#pragma once
-#include <cstddef>
-#include <memory>
#include <string>
#include <unordered_map>
-#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/jni_reader.h"
#include "vec/exec/jni_connector.h"
namespace doris {
@@ -41,7 +38,7 @@ struct TypeDescriptor;
} // namespace doris
namespace doris::vectorized {
-class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
+class LakeSoulJniReader : public JniReader {
ENABLE_FACTORY_CREATOR(LakeSoulJniReader);
public:
@@ -51,20 +48,10 @@ public:
~LakeSoulJniReader() override = default;
- Status get_next_block(::doris::vectorized::Block* block, size_t*
read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
const TLakeSoulFileDesc& _lakesoul_params;
- const std::vector<SlotDescriptor*>& _file_slot_descs;
- RuntimeState* _state;
- RuntimeProfile* _profile;
- const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
- std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index cf50aad9c22..f67ff5574c8 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -86,19 +86,6 @@ MaxComputeJniReader::MaxComputeJniReader(const
MaxComputeTableDescriptor* mc_des
"org/apache/doris/maxcompute/MaxComputeJniScanner", params,
column_names);
}
-Status MaxComputeJniReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
- return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status MaxComputeJniReader::get_columns(
- std::unordered_map<std::string, TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>* missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status MaxComputeJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h
b/be/src/vec/exec/format/table/max_compute_jni_reader.h
index 56be385f4b6..8483a6e583b 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.h
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h
@@ -59,11 +59,6 @@ public:
~MaxComputeJniReader() override = default;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index e5d997e281a..1eff56bd857 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -105,14 +105,6 @@ Status PaimonJniReader::get_next_block(Block* block,
size_t* read_rows, bool* eo
return _jni_connector->get_next_block(block, read_rows, eof);
}
-Status PaimonJniReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>*
missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status PaimonJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 6ed9a57f62e..cfdd29ea389 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -60,9 +60,6 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
index 3a7b28b91a4..8b9261e6674 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
@@ -84,19 +84,6 @@ Status TrinoConnectorJniReader::init_reader(
return _jni_connector->open(_state, _profile);
}
-Status TrinoConnectorJniReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
- return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status TrinoConnectorJniReader::get_columns(
- std::unordered_map<std::string, TypeDescriptor>* name_to_type,
- std::unordered_set<std::string>* missing_cols) {
- for (auto& desc : _file_slot_descs) {
- name_to_type->emplace(desc->col_name(), desc->type());
- }
- return Status::OK();
-}
-
Status TrinoConnectorJniReader::_set_spi_plugins_dir() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
index cb0461bb4a6..05403094058 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
@@ -52,11 +52,6 @@ public:
~TrinoConnectorJniReader() override = default;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
- Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
-
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index ff68fd67637..a0b5f7ef9b6 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -24,10 +24,9 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
-#include <algorithm>
#include <ostream>
#include <string>
-#include <utility>
+#include <unordered_map>
#include "common/logging.h"
#include "runtime/client_cache.h"
@@ -35,15 +34,14 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
-#include "runtime/types.h"
#include "util/thrift_rpc_helper.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
+#include "vec/exec/format/table/iceberg_sys_table_jni_reader.h"
namespace doris {
class RuntimeProfile;
@@ -66,7 +64,16 @@ VMetaScanner::VMetaScanner(RuntimeState* state,
pipeline::ScanLocalStateBase* lo
Status VMetaScanner::open(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::open";
RETURN_IF_ERROR(VScanner::open(state));
- RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+ if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
+ // TODO: refactor this code
+ auto reader = IcebergSysTableJniReader::create_unique(
+ _tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range.iceberg_params);
+ const std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
+ RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
+ _reader = std::move(reader);
+ } else {
+ RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+ }
return Status::OK();
}
@@ -82,6 +89,12 @@ Status VMetaScanner::_get_block_impl(RuntimeState* state,
Block* block, bool* eo
if (nullptr == state || nullptr == block || nullptr == eof) {
return Status::InternalError("input is NULL pointer");
}
+ if (_reader) {
+ // TODO: This is a temporary workaround; the code is planned to be
refactored later.
+ size_t read_rows = 0;
+ return _reader->get_next_block(block, &read_rows, eof);
+ }
+
if (_meta_eos == true) {
*eof = true;
return Status::OK();
@@ -531,6 +544,9 @@ Status
VMetaScanner::_build_partition_values_metadata_request(
Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
+ if (_reader) {
+ RETURN_IF_ERROR(_reader->close());
+ }
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 3942fd793d9..7b344faf18b 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -19,14 +19,14 @@
#include <gen_cpp/Data_types.h>
#include <gen_cpp/Types_types.h>
-#include <stdint.h>
+#include <cstdint>
#include <vector>
#include "common/factory_creator.h"
#include "common/global_types.h"
#include "common/status.h"
-#include "vec/data_types/data_type.h"
+#include "vec/exec/format/generic_reader.h"
#include "vec/exec/scan/vscanner.h"
namespace doris {
@@ -96,5 +96,8 @@ private:
const TupleDescriptor* _tuple_desc = nullptr;
std::vector<TRow> _batch_data;
const TScanRange& _scan_range;
+
+ // for reading metadata using reader from be
+ std::unique_ptr<GenericReader> _reader;
};
} // namespace doris::vectorized
diff --git a/build.sh b/build.sh
index 88e9ee4c0b0..169f5a8c72a 100755
--- a/build.sh
+++ b/build.sh
@@ -539,6 +539,7 @@ if [[ "${BUILD_HIVE_UDF}" -eq 1 ]]; then
fi
if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("fe-common")
+ modules+=("be-java-extensions/iceberg-metadata-scanner")
modules+=("be-java-extensions/hadoop-hudi-scanner")
modules+=("be-java-extensions/java-common")
modules+=("be-java-extensions/java-udf")
@@ -853,6 +854,7 @@ EOF
extensions_modules+=("avro-scanner")
extensions_modules+=("lakesoul-scanner")
extensions_modules+=("preload-extensions")
+ extensions_modules+=("iceberg-metadata-scanner")
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}"
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
new file mode 100644
index 00000000000..4d680f009df
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
@@ -0,0 +1,28 @@
+use demo.test_db;
+
+SET spark.sql.catalog.spark_catalog.write.delete.mode = merge-on-read;
+SET spark.sql.catalog.spark_catalog.write.update.mode = merge-on-read;
+
+CREATE TABLE test_iceberg_systable_unpartitioned (
+ id INT,
+ name STRING
+)
+USING ICEBERG;
+
+CREATE TABLE test_iceberg_systable_partitioned (
+ id INT,
+ name STRING
+)
+USING ICEBERG
+PARTITIONED BY (id);
+
+INSERT INTO test_iceberg_systable_unpartitioned VALUES
+(1, 'Alice'), (2, 'Bob'), (3, 'Carol'), (4, 'Dave'), (5, 'Eve'),
+(6, 'Frank'), (7, 'Grace'), (8, 'Heidi'), (9, 'Ivan'), (10, 'Judy');
+
+INSERT INTO test_iceberg_systable_partitioned VALUES
+(1, 'Alice'), (2, 'Bob'), (3, 'Carol'), (4, 'Dave'), (5, 'Eve'),
+(6, 'Frank'), (7, 'Grace'), (8, 'Heidi'), (9, 'Ivan'), (10, 'Judy');
+
+DELETE FROM test_iceberg_systable_unpartitioned WHERE id % 2 = 1;
+DELETE FROM test_iceberg_systable_partitioned WHERE id % 2 = 1;
\ No newline at end of file
diff --git a/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml
b/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml
new file mode 100644
index 00000000000..f563ddf1837
--- /dev/null
+++ b/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>iceberg-metadata-scanner</artifactId>
+
+ <properties>
+ <doris.home>${basedir}/../../</doris.home>
+ <fe_ut_parallel>1</fe_ut_parallel>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>iceberg-metadata-scanner</finalName>
+ <directory>${project.basedir}/target/</directory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/resources/package.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
new file mode 100644
index 00000000000..b1caff8ae8f
--- /dev/null
+++
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.iceberg;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.iceberg.StructLike;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergSysTableColumnValue implements ColumnValue {
+ private static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
+
+ private final Object fieldData;
+ private final String timezone;
+
+ public IcebergSysTableColumnValue(Object fieldData) {
+ this(fieldData, DEFAULT_TIME_ZONE);
+ }
+
+ public IcebergSysTableColumnValue(Object fieldData, String timezone) {
+ this.fieldData = fieldData;
+ this.timezone = timezone;
+ }
+
+ @Override
+ public boolean canGetStringAsBytes() {
+ return true;
+ }
+
+ @Override
+ public boolean isNull() {
+ return fieldData == null;
+ }
+
+ @Override
+ public boolean getBoolean() {
+ return (boolean) fieldData;
+ }
+
+ @Override
+ public byte getByte() {
+ return (byte) fieldData;
+ }
+
+ @Override
+ public short getShort() {
+ return (short) fieldData;
+ }
+
+ @Override
+ public int getInt() {
+ return (int) fieldData;
+ }
+
+ @Override
+ public float getFloat() {
+ return (float) fieldData;
+ }
+
+ @Override
+ public long getLong() {
+ return (long) fieldData;
+ }
+
+ @Override
+ public double getDouble() {
+ return (double) fieldData;
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ return (BigInteger) fieldData;
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ return (BigDecimal) fieldData;
+ }
+
+ @Override
+ public String getString() {
+ return (String) fieldData;
+ }
+
+ @Override
+ public byte[] getStringAsBytes() {
+ if (fieldData instanceof String) {
+ return ((String) fieldData).getBytes();
+ } else if (fieldData instanceof byte[]) {
+ return (byte[]) fieldData;
+ } else if (fieldData instanceof CharBuffer) {
+ CharBuffer buffer = (CharBuffer) fieldData;
+ return buffer.toString().getBytes();
+ } else if (fieldData instanceof ByteBuffer) {
+ ByteBuffer buffer = (ByteBuffer) fieldData;
+ byte[] res = new byte[buffer.limit()];
+ buffer.get(res);
+ return res;
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot convert fieldData of type " + (fieldData == null ?
"null" : fieldData.getClass().getName())
+ + " to byte[].");
+ }
+ }
+
+ @Override
+ public LocalDate getDate() {
+ return
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).plusDays((int)
fieldData).toLocalDate();
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ Instant instant = Instant.ofEpochMilli((((long) fieldData) / 1000));
+ return LocalDateTime.ofInstant(instant, ZoneId.of(timezone));
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return (byte[]) fieldData;
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+ List<?> items = (List<?>) fieldData;
+ for (Object item : items) {
+ values.add(new IcebergSysTableColumnValue(item, timezone));
+ }
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+ Map<?, ?> data = (Map<?, ?>) fieldData;
+ data.forEach((key, value) -> {
+ keys.add(new IcebergSysTableColumnValue(key, timezone));
+ values.add(new IcebergSysTableColumnValue(value, timezone));
+ });
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
+ StructLike record = (StructLike) fieldData;
+ for (Integer fieldIndex : structFieldIndex) {
+ Object rawValue = record.get(fieldIndex, Object.class);
+ values.add(new IcebergSysTableColumnValue(rawValue, timezone));
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
new file mode 100644
index 00000000000..350d0d872c3
--- /dev/null
+++
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.iceberg;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValue;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * JniScanner to read Iceberg SysTables
+ */
+public class IcebergSysTableJniScanner extends JniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSysTableJniScanner.class);
+ private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+ private final ClassLoader classLoader;
+ private final PreExecutionAuthenticator preExecutionAuthenticator;
+ private final FileScanTask scanTask;
+ private final List<NestedField> fields;
+ private final String timezone;
+ private CloseableIterator<StructLike> reader;
+
+ public IcebergSysTableJniScanner(int batchSize, Map<String, String>
params) {
+ this.classLoader = this.getClass().getClassLoader();
+ this.scanTask =
SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
+ String[] requiredFields = params.get("required_fields").split(",");
+ this.fields = selectSchema(scanTask.schema().asStruct(),
requiredFields);
+ this.timezone = params.getOrDefault("time_zone",
TimeZone.getDefault().getID());
+ Map<String, String> hadoopOptionParams = params.entrySet().stream()
+ .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+ .collect(Collectors
+ .toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+ this.preExecutionAuthenticator =
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
+ ColumnType[] requiredTypes =
parseRequiredTypes(params.get("required_types").split("#"), requiredFields);
+ initTableInfo(requiredTypes, requiredFields, batchSize);
+ }
+
+ @Override
+ public void open() throws IOException {
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ preExecutionAuthenticator.execute(() -> {
+ // execute FileScanTask to get rows
+ reader = scanTask.asDataTask().rows().iterator();
+ return null;
+ });
+ } catch (Exception e) {
+ this.close();
+ String msg = String.format("Failed to open
IcebergMetadataJniScanner");
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+ }
+
+ @Override
+ protected int getNext() throws IOException {
+ if (reader == null) {
+ return 0;
+ }
+ int rows = 0;
+ while (reader.hasNext() && rows < getBatchSize()) {
+ StructLike row = reader.next();
+ for (int i = 0; i < fields.size(); i++) {
+ NestedField field = fields.get(i);
+ Object value = row.get(i, field.type().typeId().javaClass());
+ ColumnValue columnValue = new
IcebergSysTableColumnValue(value, timezone);
+ appendData(i, columnValue);
+ }
+ rows++;
+ }
+ return rows;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ // Close the iterator to release resources
+ reader.close();
+ }
+ }
+
+ private static List<NestedField> selectSchema(StructType schema, String[]
requiredFields) {
+ List<NestedField> selectedFields = new ArrayList<>();
+ for (String requiredField : requiredFields) {
+ NestedField field = schema.field(requiredField);
+ if (field == null) {
+ throw new IllegalArgumentException("RequiredField " +
requiredField + " not found in schema");
+ }
+ selectedFields.add(field);
+ }
+ return selectedFields;
+ }
+
+ private static ColumnType[] parseRequiredTypes(String[] typeStrings,
String[] requiredFields) {
+ ColumnType[] requiredTypes = new ColumnType[typeStrings.length];
+ for (int i = 0; i < typeStrings.length; i++) {
+ String type = typeStrings[i];
+ ColumnType parsedType = ColumnType.parseType(requiredFields[i],
type);
+ if (parsedType.isUnsupported()) {
+ throw new IllegalArgumentException("Unsupported type " + type
+ " for field " + requiredFields[i]);
+ }
+ requiredTypes[i] = parsedType;
+ }
+ return requiredTypes;
+ }
+}
diff --git
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 049cb6b516d..67c415e4550 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,6 +21,7 @@ under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
+ <module>iceberg-metadata-scanner</module>
<module>hadoop-hudi-scanner</module>
<module>java-common</module>
<module>java-udf</module>
diff --git a/fe/be-java-extensions/preload-extensions/pom.xml
b/fe/be-java-extensions/preload-extensions/pom.xml
index 7ba5461126a..b64f450de18 100644
--- a/fe/be-java-extensions/preload-extensions/pom.xml
+++ b/fe/be-java-extensions/preload-extensions/pom.xml
@@ -211,6 +211,37 @@ under the License.
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-aws</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <!-- these dependencies are for iceberg-aws -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <version>${awssdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${awssdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kms</artifactId>
+ <version>${awssdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>glue</artifactId>
+ <version>${awssdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dynamodb</artifactId>
+ <version>${awssdk.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
b/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
index 724310cca0a..0fd9839bc5b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
@@ -58,6 +58,10 @@ public class StructType extends Type {
}
}
+ public StructType(StructField... fields) {
+ this(new ArrayList<>(Arrays.asList(fields)));
+ }
+
public StructType(List<Type> types) {
Preconditions.checkNotNull(types);
ArrayList<StructField> newFields = new ArrayList<>();
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 083dd9ee751..47f90706077 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -609,10 +609,12 @@ under the License.
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
+ <version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
+ <version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 2a15627ec42..8bbceac923e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -189,6 +189,10 @@ public class Column implements GsonPostProcessable {
this(name, type, false, null, isAllowNull, null, "");
}
+ public Column(String name, Type type, boolean isAllowNull, String comment)
{
+ this(name, type, false, null, isAllowNull, null, comment);
+ }
+
public Column(String name, Type type) {
this(name, type, false, null, false, null, "");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index f3daf2d2795..b776a8c3a47 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -17,16 +17,13 @@
package org.apache.doris.datasource.iceberg;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
-import org.apache.doris.thrift.TIcebergMetadataParams;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Iterables;
@@ -78,16 +75,6 @@ public class IcebergMetadataCache {
this.snapshotCache = snapshotCacheFactory.buildCache(key ->
loadSnapshot(key), null, executor);
}
- public List<Snapshot> getSnapshotList(TIcebergMetadataParams params)
throws UserException {
- CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog());
- if (catalog == null) {
- throw new UserException("The specified catalog does not exist:" +
params.getCatalog());
- }
- IcebergMetadataCacheKey key =
- IcebergMetadataCacheKey.of(catalog, params.getDatabase(),
params.getTable());
- return snapshotListCache.get(key);
- }
-
public Table getIcebergTable(CatalogIf catalog, String dbName, String
tbName) {
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog,
dbName, tbName);
return tableCache.get(key);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index ab0d84a2b26..f61c226ed75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -601,14 +601,7 @@ public class IcebergUtils {
}
Preconditions.checkNotNull(schema,
"Schema for table " + catalog.getName() + "." + dbName
+ "." + name + " is null");
- List<Types.NestedField> columns = schema.columns();
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
- for (Types.NestedField field : columns) {
- tmpSchema.add(new
Column(field.name().toLowerCase(Locale.ROOT),
- IcebergUtils.icebergTypeToDorisType(field.type()),
true, null, true, field.doc(), true,
-
schema.caseInsensitiveFindField(field.name()).fieldId()));
- }
- return tmpSchema;
+ return parseSchema(schema);
});
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e),
e);
@@ -616,6 +609,19 @@ public class IcebergUtils {
}
+ /**
+ * Parse iceberg schema to doris schema
+ */
+ public static List<Column> parseSchema(Schema schema) {
+ List<Types.NestedField> columns = schema.columns();
+ List<Column> resSchema =
Lists.newArrayListWithCapacity(columns.size());
+ for (Types.NestedField field : columns) {
+ resSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+ IcebergUtils.icebergTypeToDorisType(field.type()), true,
null, true, field.doc(), true,
+ schema.caseInsensitiveFindField(field.name()).fieldId()));
+ }
+ return resSchema;
+ }
/**
* Estimate iceberg table row count.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index e6023743f3b..d5b49aa82d3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.Table;
import org.jetbrains.annotations.NotNull;
@@ -37,6 +38,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
@@ -86,9 +88,9 @@ public class PaimonMetadataCache {
// snapshotId and schemaId
Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
long latestSchemaId = 0L;
- OptionalLong optionalSnapshotId = table.latestSnapshotId();
- if (optionalSnapshotId.isPresent()) {
- latestSnapshotId = optionalSnapshotId.getAsLong();
+ Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
+ if (optionalSnapshot.isPresent()) {
+ latestSnapshotId = optionalSnapshot.get().id();
latestSchemaId = table.snapshot(latestSnapshotId).schemaId();
snapshotTable =
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
latestSnapshotId.toString()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
similarity index 71%
rename from
fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
rename to
fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
index 3813ef68488..eaadc0a8a89 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
@@ -25,27 +25,41 @@ import
org.apache.doris.tablefunction.IcebergTableValuedFunction;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.iceberg.MetadataTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
-// table$snapshots
-public class IcebergSnapshotsSysTable extends SysTable {
- private static final Logger LOG =
LogManager.getLogger(IcebergSnapshotsSysTable.class);
+// table${sysTable}
+public class IcebergSysTable extends SysTable {
+ private static final Logger LOG =
LogManager.getLogger(IcebergSysTable.class);
+ // iceberg system tables:
+ // see @{org.apache.iceberg.MetadataTableType}
+ private static final List<IcebergSysTable> SUPPORTED_ICEBERG_SYS_TABLES =
Arrays
+ .stream(MetadataTableType.values())
+ .map(type -> new IcebergSysTable(type.name().toLowerCase()))
+ .collect(Collectors.toList());
- public static final IcebergSnapshotsSysTable INSTANCE = new
IcebergSnapshotsSysTable();
+ private final String tableName;
- private IcebergSnapshotsSysTable() {
- super("snapshots", "iceberg_meta");
+ private IcebergSysTable(String tableName) {
+ super(tableName, "iceberg_meta");
+ this.tableName = tableName;
+ }
+
+ public static List<IcebergSysTable> getSupportedIcebergSysTables() {
+ return SUPPORTED_ICEBERG_SYS_TABLES;
}
@Override
public TableValuedFunction createFunction(String ctlName, String dbName,
String sourceNameWithMetaName) {
List<String> nameParts = Lists.newArrayList(ctlName, dbName,
getSourceTableName(sourceNameWithMetaName));
- return IcebergMeta.createSnapshots(nameParts);
+ return IcebergMeta.createIcebergMeta(nameParts, tableName);
}
@Override
@@ -54,7 +68,7 @@ public class IcebergSnapshotsSysTable extends SysTable {
getSourceTableName(sourceNameWithMetaName));
Map<String, String> params = Maps.newHashMap();
params.put(IcebergTableValuedFunction.TABLE,
Joiner.on(".").join(nameParts));
- params.put(IcebergTableValuedFunction.QUERY_TYPE, "snapshots");
+ params.put(IcebergTableValuedFunction.QUERY_TYPE, tableName);
try {
return new TableValuedFunctionRef(tvfName, null, params);
} catch (org.apache.doris.common.AnalysisException e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
index 15bdf80aebb..9b471030d45 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import java.util.List;
public class SupportedSysTables {
+ // TODO: use kv map
public static final List<SysTable> HIVE_SUPPORTED_SYS_TABLES;
public static final List<SysTable> ICEBERG_SUPPORTED_SYS_TABLES;
public static final List<SysTable> PAIMON_SUPPORTED_SYS_TABLES;
@@ -32,8 +33,8 @@ public class SupportedSysTables {
HIVE_SUPPORTED_SYS_TABLES = Lists.newArrayList();
HIVE_SUPPORTED_SYS_TABLES.add(PartitionsSysTable.INSTANCE);
// iceberg
- ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList();
- ICEBERG_SUPPORTED_SYS_TABLES.add(IcebergSnapshotsSysTable.INSTANCE);
+ ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList(
+ IcebergSysTable.getSupportedIcebergSysTables());
// paimon
PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList();
// hudi
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index 701850691f3..ac6281112e5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -27,6 +27,7 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
import org.apache.doris.thrift.TMetaScanNode;
+import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
@@ -42,7 +43,7 @@ import java.util.List;
public class MetadataScanNode extends ExternalScanNode {
private final MetadataTableValuedFunction tvf;
-
+ private boolean initedScanRangeLocations = false;
private final List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
public MetadataScanNode(PlanNodeId id, TupleDescriptor desc,
MetadataTableValuedFunction tvf) {
@@ -54,7 +55,6 @@ public class MetadataScanNode extends ExternalScanNode {
@Override
public void init() throws UserException {
super.init();
- createScanRangeLocations();
}
@Override
@@ -69,24 +69,36 @@ public class MetadataScanNode extends ExternalScanNode {
}
@Override
- protected void createScanRangeLocations() throws UserException {
- TScanRange scanRange = new TScanRange();
- scanRange.setMetaScanRange(tvf.getMetaScanRange());
- // set location
- TScanRangeLocation location = new TScanRangeLocation();
- Backend backend = backendPolicy.getNextBe();
- location.setBackendId(backend.getId());
- location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
-
- TScanRangeLocations locations = new TScanRangeLocations();
- locations.addToLocations(location);
- locations.setScanRange(scanRange);
-
- scanRangeLocations.add(locations);
+ protected void createScanRangeLocations() {
+ List<String> requiredFileds = desc.getSlots().stream()
+ .filter(slot -> slot.isMaterialized())
+ .map(slot -> slot.getColumn().getName())
+ .collect(java.util.stream.Collectors.toList());
+ for (TMetaScanRange metaScanRange :
tvf.getMetaScanRanges(requiredFileds)) {
+ TScanRange scanRange = new TScanRange();
+ scanRange.setMetaScanRange(metaScanRange);
+
+ TScanRangeLocation location = new TScanRangeLocation();
+ Backend backend = backendPolicy.getNextBe();
+ location.setBackendId(backend.getId());
+ location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
+
+ TScanRangeLocations locations = new TScanRangeLocations();
+ locations.addToLocations(location);
+ locations.setScanRange(scanRange);
+
+ scanRangeLocations.add(locations);
+ }
}
@Override
public List<TScanRangeLocations> getScanRangeLocations(long
maxScanRangeLength) {
+ if (!initedScanRangeLocations) {
+ // delay createScanRangeLocations in getScanRangeLocations to keep
desc has been
+ // projected
+ createScanRangeLocations();
+ initedScanRangeLocations = true;
+ }
return scanRangeLocations;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
index 1342a5d9091..8be995beaef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
@@ -37,10 +37,10 @@ public class IcebergMeta extends TableValuedFunction {
super("iceberg_meta", properties);
}
- public static IcebergMeta createSnapshots(List<String> nameParts) {
+ public static IcebergMeta createIcebergMeta(List<String> nameParts, String
queryType) {
Map<String, String> prop = Maps.newHashMap();
prop.put(IcebergTableValuedFunction.TABLE,
Joiner.on(".").join(nameParts));
- prop.put(IcebergTableValuedFunction.QUERY_TYPE, "snapshots");
+ prop.put(IcebergTableValuedFunction.QUERY_TYPE, queryType);
return new IcebergMeta(new Properties(prop));
}
@@ -53,10 +53,10 @@ public class IcebergMeta extends TableValuedFunction {
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
- return new IcebergTableValuedFunction(arguments);
+ return IcebergTableValuedFunction.create(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build
IcebergTableValuedFunction by "
- + this + ": " + t.getMessage(), t);
+ + this + ": " + t.getMessage(), t);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
index a40ec67c0dc..826e1e7c59d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
@@ -33,6 +33,7 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -103,13 +104,13 @@ public class BackendsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.BACKENDS);
TBackendsMetadataParams backendsMetadataParams = new
TBackendsMetadataParams();
backendsMetadataParams.setClusterName("");
metaScanRange.setBackendsParams(backendsMetadataParams);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
index 062cb29b193..75df6bb6221 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
@@ -17,8 +17,6 @@
package org.apache.doris.tablefunction;
-
-
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@@ -28,6 +26,7 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -36,11 +35,11 @@ public class CatalogsTableValuedFunction extends
MetadataTableValuedFunction {
public static final String NAME = "catalogs";
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("CatalogId", ScalarType.createType(PrimitiveType.BIGINT)),
- new Column("CatalogName", ScalarType.createStringType()),
- new Column("CatalogType", ScalarType.createStringType()),
- new Column("Property", ScalarType.createStringType()),
- new Column("Value", ScalarType.createStringType()));
+ new Column("CatalogId",
ScalarType.createType(PrimitiveType.BIGINT)),
+ new Column("CatalogName", ScalarType.createStringType()),
+ new Column("CatalogType", ScalarType.createStringType()),
+ new Column("Property", ScalarType.createStringType()),
+ new Column("Value", ScalarType.createStringType()));
public CatalogsTableValuedFunction(Map<String, String> params)
throws org.apache.doris.nereids.exceptions.AnalysisException {
@@ -80,9 +79,9 @@ public class CatalogsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.CATALOGS);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
index 66ac253e88d..d1b176f35a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
@@ -32,6 +32,7 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -87,13 +88,13 @@ public class FrontendsDisksTableValuedFunction extends
MetadataTableValuedFuncti
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS);
TFrontendsMetadataParams frontendsMetadataParams = new
TFrontendsMetadataParams();
frontendsMetadataParams.setClusterName("");
metaScanRange.setFrontendsParams(frontendsMetadataParams);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
index 06d323bc66c..5acf44f4d3b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
@@ -32,6 +32,7 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -96,13 +97,13 @@ public class FrontendsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.FRONTENDS);
TFrontendsMetadataParams frontendsMetadataParams = new
TFrontendsMetadataParams();
frontendsMetadataParams.setClusterName("");
metaScanRange.setFrontendsParams(frontendsMetadataParams);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index 87791df380a..b1d6c82329a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -121,7 +121,7 @@ public class HudiTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.HUDI);
// set hudi metadata params
@@ -131,7 +131,7 @@ public class HudiTableValuedFunction extends
MetadataTableValuedFunction {
hudiMetadataParams.setDatabase(hudiTableName.getDb());
hudiMetadataParams.setTable(hudiTableName.getTbl());
metaScanRange.setHudiParams(hudiMetadataParams);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index 726c4e144f5..25a3ad79342 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -20,28 +20,35 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.SerializationUtil;
import java.util.List;
import java.util.Map;
/**
- * The Implement of table valued function
+ * The class of table valued function for iceberg metadata.
* iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
*/
public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
@@ -52,36 +59,14 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
private static final ImmutableSet<String> PROPERTIES_SET =
ImmutableSet.of(TABLE, QUERY_TYPE);
- private static final ImmutableList<Column> SCHEMA_SNAPSHOT =
ImmutableList.of(
- new Column("committed_at", PrimitiveType.DATETIMEV2, false),
- new Column("snapshot_id", PrimitiveType.BIGINT, false),
- new Column("parent_id", PrimitiveType.BIGINT, false),
- new Column("operation", PrimitiveType.STRING, false),
- // todo: compress manifest_list string
- new Column("manifest_list", PrimitiveType.STRING, false),
- new Column("summary", PrimitiveType.STRING, false));
+ private final String queryType;
+ private final Table sysTable;
+ private final List<Column> schema;
+ private final Map<String, String> hadoopProps;
+ private final PreExecutionAuthenticator preExecutionAuthenticator;
-
- private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
-
- static {
- ImmutableMap.Builder<String, Integer> builder = new
ImmutableMap.Builder();
- for (int i = 0; i < SCHEMA_SNAPSHOT.size(); i++) {
- builder.put(SCHEMA_SNAPSHOT.get(i).getName().toLowerCase(), i);
- }
- COLUMN_TO_INDEX = builder.build();
- }
-
- public static Integer getColumnIndexFromColumnName(String columnName) {
- return COLUMN_TO_INDEX.get(columnName.toLowerCase());
- }
-
- private TIcebergQueryType queryType;
-
- // here tableName represents the name of a table in Iceberg.
- private final TableName icebergTableName;
-
- public IcebergTableValuedFunction(Map<String, String> params) throws
AnalysisException {
+ public static IcebergTableValuedFunction create(Map<String, String> params)
+ throws AnalysisException {
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
@@ -91,31 +76,57 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
validParams.put(key.toLowerCase(), params.get(key));
}
String tableName = validParams.get(TABLE);
- String queryTypeString = validParams.get(QUERY_TYPE);
- if (tableName == null || queryTypeString == null) {
+ String queryType = validParams.get(QUERY_TYPE);
+ if (tableName == null || queryType == null) {
throw new AnalysisException("Invalid iceberg metadata query");
}
+ // TODO: support these system tables in future;
+ if (queryType.equalsIgnoreCase("all_manifests") ||
queryType.equalsIgnoreCase("position_deletes")) {
+ throw new AnalysisException("SysTable " + queryType + " is not
supported yet");
+ }
String[] names = tableName.split("\\.");
if (names.length != 3) {
throw new AnalysisException("The iceberg table name contains the
catalogName, databaseName, and tableName");
}
- this.icebergTableName = new TableName(names[0], names[1], names[2]);
+ TableName icebergTableName = new TableName(names[0], names[1],
names[2]);
// check auth
if (!Env.getCurrentEnv().getAccessManager()
- .checkTblPriv(ConnectContext.get(), this.icebergTableName,
PrivPredicate.SELECT)) {
+ .checkTblPriv(ConnectContext.get(), icebergTableName,
PrivPredicate.SELECT)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SELECT",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- this.icebergTableName.getDb() + ": " +
this.icebergTableName.getTbl());
- }
- try {
- this.queryType =
TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
- } catch (IllegalArgumentException e) {
- throw new AnalysisException("Unsupported iceberg metadata query
type: " + queryType);
+ icebergTableName.getDb() + ": " +
icebergTableName.getTbl());
}
+ return new IcebergTableValuedFunction(icebergTableName, queryType);
}
- public TIcebergQueryType getIcebergQueryType() {
- return queryType;
+ public IcebergTableValuedFunction(TableName icebergTableName, String
queryType)
+ throws AnalysisException {
+ this.queryType = queryType;
+ CatalogIf<?> catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new AnalysisException("Catalog " + icebergTableName.getCtl()
+ " is not an external catalog");
+ }
+ ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
+ hadoopProps =
externalCatalog.getCatalogProperty().getHadoopProperties();
+ preExecutionAuthenticator =
externalCatalog.getPreExecutionAuthenticator();
+ Table table;
+ try {
+ table = preExecutionAuthenticator.execute(() -> {
+ return IcebergUtils.getIcebergTable(externalCatalog,
icebergTableName.getDb(),
+ icebergTableName.getTbl());
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
+ }
+ if (table == null) {
+ throw new AnalysisException("Iceberg table " + icebergTableName +
" does not exist");
+ }
+ MetadataTableType tableType = MetadataTableType.from(queryType);
+ if (tableType == null) {
+ throw new AnalysisException("Unrecognized queryType for iceberg
metadata: " + queryType);
+ }
+ this.sysTable = MetadataTableUtils.createMetadataTableInstance(table,
tableType);
+ this.schema = IcebergUtils.parseSchema(sysTable.schema());
}
@Override
@@ -124,36 +135,36 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
- TMetaScanRange metaScanRange = new TMetaScanRange();
- metaScanRange.setMetadataType(TMetadataType.ICEBERG);
- // set iceberg metadata params
- TIcebergMetadataParams icebergMetadataParams = new
TIcebergMetadataParams();
- icebergMetadataParams.setIcebergQueryType(queryType);
- icebergMetadataParams.setCatalog(icebergTableName.getCtl());
- icebergMetadataParams.setDatabase(icebergTableName.getDb());
- icebergMetadataParams.setTable(icebergTableName.getTbl());
- metaScanRange.setIcebergParams(icebergMetadataParams);
- return metaScanRange;
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ List<TMetaScanRange> scanRanges = Lists.newArrayList();
+ CloseableIterable<FileScanTask> tasks;
+ try {
+ tasks = preExecutionAuthenticator.execute(() -> {
+ return sysTable.newScan().select(requiredFileds).planFiles();
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
+ }
+ for (FileScanTask task : tasks) {
+ TMetaScanRange metaScanRange = new TMetaScanRange();
+ metaScanRange.setMetadataType(TMetadataType.ICEBERG);
+ // set iceberg metadata params
+ TIcebergMetadataParams icebergMetadataParams = new
TIcebergMetadataParams();
+ icebergMetadataParams.setHadoopProps(hadoopProps);
+
icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task));
+ metaScanRange.setIcebergParams(icebergMetadataParams);
+ scanRanges.add(metaScanRange);
+ }
+ return scanRanges;
}
@Override
public String getTableName() {
- return "IcebergMetadataTableValuedFunction";
+ return "IcebergTableValuedFunction<" + queryType + ">";
}
- /**
- * The tvf can register columns of metadata table
- * The data is provided by getIcebergMetadataTable in FrontendService
- *
- * @return metadata columns
- * @see org.apache.doris.service.FrontendServiceImpl
- */
@Override
public List<Column> getTableColumns() {
- if (queryType == TIcebergQueryType.SNAPSHOTS) {
- return SCHEMA_SNAPSHOT;
- }
- return Lists.newArrayList();
+ return schema;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index 81823d17d4f..b5d0489d30c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
@@ -98,14 +99,14 @@ public class JobsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.JOBS);
TJobsMetadataParams jobParam = new TJobsMetadataParams();
jobParam.setType(jobType.name());
jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setJobsParams(jobParam);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index cebda862a83..468f7e0fe5f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -42,7 +42,6 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.FrontendsProcNode;
import org.apache.doris.common.proc.PartitionsProcDir;
import org.apache.doris.common.util.DebugUtil;
@@ -86,8 +85,6 @@ import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.THudiMetadataParams;
import org.apache.doris.thrift.THudiQueryType;
-import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergQueryType;
import org.apache.doris.thrift.TJobsMetadataParams;
import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetadataTableRequestParams;
@@ -112,15 +109,12 @@ import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.iceberg.Snapshot;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.jetbrains.annotations.NotNull;
import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@@ -226,9 +220,6 @@ public class MetadataGenerator {
TMetadataTableRequestParams params = request.getMetadaTableParams();
TMetadataType metadataType =
request.getMetadaTableParams().getMetadataType();
switch (metadataType) {
- case ICEBERG:
- result = icebergMetadataResult(params);
- break;
case HUDI:
result = hudiMetadataResult(params);
break;
@@ -336,56 +327,6 @@ public class MetadataGenerator {
return result;
}
- private static TFetchSchemaTableDataResult
icebergMetadataResult(TMetadataTableRequestParams params) {
- if (!params.isSetIcebergMetadataParams()) {
- return errorResult("Iceberg metadata params is not set.");
- }
-
- TIcebergMetadataParams icebergMetadataParams =
params.getIcebergMetadataParams();
- TIcebergQueryType icebergQueryType =
icebergMetadataParams.getIcebergQueryType();
- IcebergMetadataCache icebergMetadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
- List<TRow> dataBatch = Lists.newArrayList();
- TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-
- switch (icebergQueryType) {
- case SNAPSHOTS:
- List<Snapshot> snapshotList;
- try {
- snapshotList =
icebergMetadataCache.getSnapshotList(icebergMetadataParams);
- } catch (UserException e) {
- return errorResult(e.getMessage());
- }
- for (Snapshot snapshot : snapshotList) {
- TRow trow = new TRow();
- LocalDateTime committedAt =
LocalDateTime.ofInstant(Instant.ofEpochMilli(
- snapshot.timestampMillis()),
TimeUtils.getTimeZone().toZoneId());
- long encodedDatetime =
TimeUtils.convertToDateTimeV2(committedAt.getYear(),
- committedAt.getMonthValue(),
- committedAt.getDayOfMonth(),
committedAt.getHour(), committedAt.getMinute(),
- committedAt.getSecond(), committedAt.getNano() /
1000);
-
- trow.addToColumnValue(new
TCell().setLongVal(encodedDatetime));
- trow.addToColumnValue(new
TCell().setLongVal(snapshot.snapshotId()));
- if (snapshot.parentId() == null) {
- trow.addToColumnValue(new TCell().setLongVal(-1L));
- } else {
- trow.addToColumnValue(new
TCell().setLongVal(snapshot.parentId()));
- }
- trow.addToColumnValue(new
TCell().setStringVal(snapshot.operation()));
- trow.addToColumnValue(new
TCell().setStringVal(snapshot.manifestListLocation()));
- trow.addToColumnValue(new TCell().setStringVal(new
Gson().toJson(snapshot.summary())));
-
- dataBatch.add(trow);
- }
- break;
- default:
- return errorResult("Unsupported iceberg inspect type: " +
icebergQueryType);
- }
- result.setDataBatch(dataBatch);
- result.setStatus(new TStatus(TStatusCode.OK));
- return result;
- }
-
private static TFetchSchemaTableDataResult
hudiMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetHudiMetadataParams()) {
return errorResult("Hudi metadata params is not set.");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 074f3d66c96..92b30b0347b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -27,6 +27,8 @@ import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
+import java.util.List;
+
public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf {
public static Integer getColumnIndexFromColumnName(TMetadataType type,
String columnName,
@@ -39,8 +41,6 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
return
FrontendsTableValuedFunction.getColumnIndexFromColumnName(columnName);
case FRONTENDS_DISKS:
return
FrontendsDisksTableValuedFunction.getColumnIndexFromColumnName(columnName);
- case ICEBERG:
- return
IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName);
case HUDI:
return
HudiTableValuedFunction.getColumnIndexFromColumnName(columnName);
case CATALOGS:
@@ -60,7 +60,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
public abstract TMetadataType getMetadataType();
- public abstract TMetaScanRange getMetaScanRange();
+ public abstract List<TMetaScanRange> getMetaScanRanges(List<String>
requiredFileds);
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index 02002033bbe..c40a8d4716d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -29,6 +29,7 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -105,7 +106,7 @@ public class MvInfosTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -118,7 +119,7 @@ public class MvInfosTableValuedFunction extends
MetadataTableValuedFunction {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() end");
}
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
index 59efe9fcefd..e0a0d4dc649 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
@@ -145,7 +145,7 @@ public class PartitionValuesTableValuedFunction extends
MetadataTableValuedFunct
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -156,7 +156,7 @@ public class PartitionValuesTableValuedFunction extends
MetadataTableValuedFunct
partitionParam.setDatabase(databaseName);
partitionParam.setTable(tableName);
metaScanRange.setPartitionValuesParams(partitionParam);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
index 00169ad555f..3ffb77cdbc6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
@@ -43,6 +43,7 @@ import org.apache.doris.thrift.TPartitionsMetadataParams;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -210,7 +211,7 @@ public class PartitionsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -224,7 +225,7 @@ public class PartitionsTableValuedFunction extends
MetadataTableValuedFunction {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() end");
}
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 133c1885a84..5485c33c033 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -57,7 +57,7 @@ public abstract class TableValuedFunctionIf {
case LocalTableValuedFunction.NAME:
return new LocalTableValuedFunction(params);
case IcebergTableValuedFunction.NAME:
- return new IcebergTableValuedFunction(params);
+ return IcebergTableValuedFunction.create(params);
case HudiTableValuedFunction.NAME:
return new HudiTableValuedFunction(params);
case BackendsTableValuedFunction.NAME:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index 1895fd53e14..5d60adbeacf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TTasksMetadataParams;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
@@ -98,14 +99,14 @@ public class TasksTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public TMetaScanRange getMetaScanRange() {
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.TASKS);
TTasksMetadataParams taskParam = new TTasksMetadataParams();
taskParam.setType(jobType.name());
taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setTasksParams(taskParam);
- return metaScanRange;
+ return Lists.newArrayList(metaScanRange);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 1032d2ce58f..251f4d8f6b6 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -17,7 +17,123 @@
package org.apache.doris.datasource.iceberg;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.transforms.Days;
+import org.apache.iceberg.transforms.Hours;
+import org.apache.iceberg.transforms.Months;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.List;
+import java.util.Map;
+
public class IcebergExternalTableTest {
- // branch3.1 need pick by other pr
+ private org.apache.iceberg.Table icebergTable;
+ private PartitionSpec spec;
+ private PartitionField field;
+ private Schema schema;
+ private IcebergExternalCatalog mockCatalog;
+
+ @BeforeEach
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ icebergTable = Mockito.mock(org.apache.iceberg.Table.class);
+ spec = Mockito.mock(PartitionSpec.class);
+ field = Mockito.mock(PartitionField.class);
+ schema = Mockito.mock(Schema.class);
+ mockCatalog = Mockito.mock(IcebergExternalCatalog.class);
+ }
+
+ @Test
+ public void testIsSupportedPartitionTable() {
+ IcebergExternalDatabase database = new
IcebergExternalDatabase(mockCatalog, 1L, "2", "2");
+ IcebergExternalTable table = new IcebergExternalTable(1, "1", "1",
mockCatalog, database);
+
+ // Create a spy to be able to mock the getIcebergTable method and the
makeSureInitialized method
+ IcebergExternalTable spyTable = Mockito.spy(table);
+ Mockito.doReturn(icebergTable).when(spyTable).getIcebergTable();
+ // Simulate the makeSureInitialized method as a no-op to avoid calling
the parent class implementation
+ Mockito.doNothing().when(spyTable).makeSureInitialized();
+
+ Map<Integer, PartitionSpec> specs = Maps.newHashMap();
+
+ // Test null
+ specs.put(0, null);
+ Mockito.when(icebergTable.specs()).thenReturn(specs);
+
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+ Assertions.assertFalse(spyTable.isValidRelatedTable());
+
+ Mockito.verify(icebergTable, Mockito.times(1)).specs();
+ Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+ Assertions.assertFalse(spyTable.validRelatedTableCache());
+
+ // Test spec fields are empty.
+ specs.put(0, spec);
+ spyTable.setIsValidRelatedTableCached(false);
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+ Mockito.when(icebergTable.specs()).thenReturn(specs);
+ List<PartitionField> fields = Lists.newArrayList();
+ Mockito.when(spec.fields()).thenReturn(fields);
+
+ Assertions.assertFalse(spyTable.isValidRelatedTable());
+ Mockito.verify(spec, Mockito.times(1)).fields();
+ Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+ Assertions.assertFalse(spyTable.validRelatedTableCache());
+
+ // Test spec fields are more than 1.
+ specs.put(0, spec);
+ spyTable.setIsValidRelatedTableCached(false);
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+ Mockito.when(icebergTable.specs()).thenReturn(specs);
+ fields.add(null);
+ fields.add(null);
+ Mockito.when(spec.fields()).thenReturn(fields);
+
+ Assertions.assertFalse(spyTable.isValidRelatedTable());
+ Mockito.verify(spec, Mockito.times(2)).fields();
+ Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+ Assertions.assertFalse(spyTable.validRelatedTableCache());
+ fields.clear();
+
+ // Test true
+ fields.add(field);
+ spyTable.setIsValidRelatedTableCached(false);
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+ Mockito.when(icebergTable.schema()).thenReturn(schema);
+
Mockito.when(schema.findColumnName(ArgumentMatchers.anyInt())).thenReturn("col1");
+ Mockito.when(field.transform()).thenReturn(new Hours());
+ Mockito.when(field.sourceId()).thenReturn(1);
+
+ Assertions.assertTrue(spyTable.isValidRelatedTable());
+ Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+ Assertions.assertTrue(spyTable.validRelatedTableCache());
+ Mockito.verify(schema,
Mockito.times(1)).findColumnName(ArgumentMatchers.anyInt());
+
+ Mockito.when(field.transform()).thenReturn(new Days());
+ Mockito.when(field.sourceId()).thenReturn(1);
+ spyTable.setIsValidRelatedTableCached(false);
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+ Assertions.assertTrue(spyTable.isValidRelatedTable());
+
+ Mockito.when(field.transform()).thenReturn(new Months());
+ Mockito.when(field.sourceId()).thenReturn(1);
+ spyTable.setIsValidRelatedTableCached(false);
+ Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+ Assertions.assertTrue(spyTable.isValidRelatedTable());
+ Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+ Assertions.assertTrue(spyTable.validRelatedTableCache());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
deleted file mode 100644
index 8ae51a61f46..00000000000
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg.source;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.jmockit.Deencapsulation;
-import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
-import org.apache.doris.datasource.iceberg.IcebergUtils;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.thrift.TPushAggOp;
-
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.GenericManifestFile;
-import org.apache.iceberg.ManifestContent;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopTableOperations;
-import org.apache.iceberg.io.CloseableIterable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class IcebergScanNodeTest {
-
- @Mocked
- HadoopTableOperations hadoopTableOperations;
- @Mocked
- Snapshot snapshot;
-
- @Test
- public void testIsBatchMode() {
- SessionVariable sessionVariable = new SessionVariable();
- IcebergScanNode icebergScanNode = new IcebergScanNode(new
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
-
- new Expectations(icebergScanNode) {{
- icebergScanNode.getPushDownAggNoGroupingOp();
- result = TPushAggOp.COUNT;
- icebergScanNode.getCountFromSnapshot();
- result = 1L;
- }};
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- BaseTable mockTable = new BaseTable(hadoopTableOperations,
"mockTable");
- new Expectations(icebergScanNode) {{
- icebergScanNode.getPushDownAggNoGroupingOp();
- result = TPushAggOp.NONE;
- Deencapsulation.setField(icebergScanNode, "icebergTable",
mockTable);
- }};
- TableScan tableScan = mockTable.newScan();
- new Expectations(mockTable) {{
- mockTable.currentSnapshot();
- result = null;
- icebergScanNode.createTableScan();
- result = tableScan;
- }};
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- new Expectations(mockTable) {{
- mockTable.currentSnapshot();
- result = snapshot;
- }};
- new Expectations(sessionVariable) {{
- sessionVariable.getEnableExternalTableBatchMode();
- result = false;
- }};
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
-
- new Expectations(sessionVariable) {{
- sessionVariable.getEnableExternalTableBatchMode();
- result = true;
- }};
- new Expectations(icebergScanNode) {{
- Deencapsulation.setField(icebergScanNode,
"preExecutionAuthenticator", new PreExecutionAuthenticator());
- }};
- new Expectations() {{
- sessionVariable.getNumFilesInBatchMode();
- result = 1024;
- }};
-
- mockManifestFile("p", 10, 0);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 0, 10);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 10, 10);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 1024, 0);
- Assert.assertTrue(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 0, 1024);
- Assert.assertTrue(icebergScanNode.isBatchMode());
-
- new Expectations() {{
- sessionVariable.getNumFilesInBatchMode();
- result = 100;
- }};
-
- mockManifestFile("p", 10, 0);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 0, 10);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 10, 10);
- Assert.assertFalse(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 0, 100);
- Assert.assertTrue(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 100, 0);
- Assert.assertTrue(icebergScanNode.isBatchMode());
-
- mockManifestFile("p", 10, 90);
- Assert.assertTrue(icebergScanNode.isBatchMode());
- }
-
- private void mockManifestFile(String path, int addedFileCount, int
existingFileCount) {
- new MockUp<IcebergUtils>() {
- @Mock
- CloseableIterable<ManifestFile>
getMatchingManifest(List<ManifestFile> dataManifests,
- Map<Integer,
PartitionSpec> specsById,
- Expression
dataFilte) {
- return CloseableIterable.withNoopClose(new
ArrayList<ManifestFile>() {{
- add(genManifestFile(path, addedFileCount,
existingFileCount));
- }}
- );
- }
- };
- }
-
- private ManifestFile genManifestFile(String path, int addedFileCount, int
existingFileCount) {
- return new GenericManifestFile(
- path,
- 10, // length
- 1, // specId
- ManifestContent.DATA,
- 1, // sequenceNumber
- 1, // minSeqNumber
- 1L, // snapshotid
- addedFileCount,
- 1,
- existingFileCount,
- 1,
- 0, // deleteFilesCount
- 0,
- Lists.newArrayList(),
- null
- );
- }
-}
diff --git a/fe/pom.xml b/fe/pom.xml
index 7c4dbb0551c..bf2fd82635e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -222,8 +222,9 @@ under the License.
<module>be-java-extensions</module>
</modules>
<properties>
-
<doris.hive.catalog.shade.version>2.1.4</doris.hive.catalog.shade.version>
- <avro.version>1.11.4</avro.version>
+
<doris.hive.catalog.shade.version>3.0.1</doris.hive.catalog.shade.version>
+ <!-- iceberg 1.9.1 depends avro on 1.12 -->
+ <avro.version>1.12.0</avro.version>
<parquet.version>1.13.1</parquet.version>
<spark.version>3.4.3</spark.version>
<hudi.version>0.15.0</hudi.version>
@@ -318,7 +319,7 @@ under the License.
<!-- ATTN: avro version must be consistent with Iceberg version -->
<!-- Please modify iceberg.version and avro.version together,
you can find avro version info in iceberg mvn repository -->
- <iceberg.version>1.6.1</iceberg.version>
+ <iceberg.version>1.9.1</iceberg.version>
<maxcompute.version>0.49.0-public</maxcompute.version>
<arrow.version>17.0.0</arrow.version>
<presto.hadoop.version>2.7.4-11</presto.hadoop.version>
@@ -365,7 +366,7 @@ under the License.
<quartz.version>2.3.2</quartz.version>
<aircompressor.version>0.27</aircompressor.version>
<!-- paimon -->
- <paimon.version>1.0.1</paimon.version>
+ <paimon.version>1.1.1</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<!-- arrow flight sql -->
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 33f8f74e14f..d8b3b1cec37 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -541,10 +541,8 @@ struct TDataGenScanRange {
struct TIcebergMetadataParams {
- 1: optional Types.TIcebergQueryType iceberg_query_type
- 2: optional string catalog
- 3: optional string database
- 4: optional string table
+ 1: optional string serialized_task
+ 2: optional map<string, string> hadoop_props
}
struct THudiMetadataParams {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 4182330231f..e59a36e964e 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -745,10 +745,6 @@ enum TMetadataType {
HUDI,
}
-enum TIcebergQueryType {
- SNAPSHOTS
-}
-
enum THudiQueryType {
TIMELINE
}
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out
index dde27c2049d..7dc0ea69f56 100644
Binary files
a/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out and
b/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
index 357f5da9708..16d852d78e3 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
@@ -24,7 +24,7 @@ suite("test_iceberg_sys_table",
"p0,external,doris,external_docker,external_dock
}
String catalog_name = "test_iceberg_systable_ctl"
- String db_name = "test_iceberg_systable_db"
+ String db_name = "test_db"
String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -41,43 +41,314 @@ suite("test_iceberg_sys_table",
"p0,external,doris,external_docker,external_dock
);"""
sql """switch ${catalog_name}"""
-
- sql """drop database if exists ${db_name} force"""
- sql """create database ${db_name}"""
sql """use ${db_name}"""
- sql """ create table test_iceberg_systable_tbl1 (id int) """
- sql """insert into test_iceberg_systable_tbl1 values(1);"""
- qt_sql_tbl1 """select * from test_iceberg_systable_tbl1"""
- qt_sql_tbl1_systable """select count(*) from
test_iceberg_systable_tbl1\$snapshots"""
- List<List<Object>> res1 = sql """select * from
test_iceberg_systable_tbl1\$snapshots"""
- List<List<Object>> res2 = sql """select * from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
- "query_type" = "snapshots");
- """
- assertEquals(res1.size(), res2.size());
- for (int i = 0; i < res1.size(); i++) {
- for (int j = 0; j < res1[i].size(); j++) {
- assertEquals(res1[i][j], res2[i][j]);
+ def test_iceberg_systable = { tblName, systableType ->
+ def systableName = "${tblName}\$${systableType}"
+
+ order_qt_desc_systable1 """desc ${systableName}"""
+ order_qt_desc_systable2 """desc ${db_name}.${systableName}"""
+ order_qt_desc_systable3 """desc
${catalog_name}.${db_name}.${systableName}"""
+
+ List<List<Object>> schema = sql """desc ${systableName}"""
+ String key = String.valueOf(schema[1][0])
+
+ order_qt_tbl1_systable """select * from ${systableName}"""
+ order_qt_tbl1_systable_select """select ${key} from ${systableName}"""
+ order_qt_tbl1_systable_count """select count(*) from ${systableName}"""
+ order_qt_tbl1_systable_count_select """select count(${key}) from
${systableName}"""
+
+ List<List<Object>> res1 = sql """select ${key} from ${systableName}
order by ${key}"""
+ List<List<Object>> res2 = sql """select ${key} from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${tblName}",
+ "query_type" = "${systableType}") order by ${key};
+ """
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+
+ if (res1.isEmpty()) {
+ return
}
+
+ String value = String.valueOf(res1[0][0])
+ order_qt_tbl1_systable_where """
+ select * from ${systableName} where ${key}="${value}";
+ """
+ order_qt_tbl1_systable_where_count """
+ select count(*) from ${systableName} where ${key}="${value}";
+ """
+ order_qt_systable_where2 """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${tblName}",
+ "query_type" = "${systableType}") where ${key}="${value}";
+ """
}
- String snapshot_id = String.valueOf(res1[0][1]);
- qt_sql_tbl1_systable_where """
- select count(*) from test_iceberg_systable_tbl1\$snapshots where
snapshot_id=${snapshot_id};
- """
- qt_tbl1_systable_where2 """select count(*) from iceberg_meta(
- "table" = "${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
- "query_type" = "snapshots") where snapshot_id="${snapshot_id}";
- """
+ def test_systable_entries = { table, systableType ->
+ def systableName = "${table}\$${systableType}"
+ order_qt_desc_entries """desc ${systableName}"""
- sql """insert into test_iceberg_systable_tbl1 values(2);"""
- order_qt_sql_tbl12 """select * from test_iceberg_systable_tbl1"""
- qt_sql_tbl1_systable2 """select count(*) from
test_iceberg_systable_tbl1\$snapshots"""
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_entries """select status, sequence_number,
file_sequence_number, readable_metrics from ${systableName}"""
+ order_qt_select_entries_count """select count(*) from
${systableName}"""
+ order_qt_select_entries_where """select status, sequence_number,
file_sequence_number, readable_metrics from ${systableName} where status="0";"""
+ order_qt_select_entries_where_count """select count(status) from
${systableName} where status="0";"""
+ }
+
+ def test_systable_files = { table, systableType ->
+ def systableName = "${table}\$${systableType}"
+ order_qt_desc_files """desc ${systableName}"""
- qt_desc_systable1 """desc test_iceberg_systable_tbl1\$snapshots"""
- qt_desc_systable2 """desc
${db_name}.test_iceberg_systable_tbl1\$snapshots"""
- qt_desc_systable3 """desc
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_files """select content, file_format, record_count,
lower_bounds, upper_bounds, readable_metrics from ${systableName}"""
+ order_qt_select_files_count """select count(*) from ${systableName}"""
+ order_qt_select_files_where """select content, file_format,
record_count, lower_bounds, upper_bounds, readable_metrics from ${systableName}
where content="0";"""
+ order_qt_select_files_where_count """select count(content) from
${systableName} where content="0";"""
+ }
+
+ def test_systable_history = { table ->
+ def systableName = "${table}\$history"
+ order_qt_desc_history """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_history_count """select count(*) from
${systableName}"""
+
+ List<List<Object>> res1 = sql """select * from ${systableName} order
by snapshot_id"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "history") order by snapshot_id"""
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+ }
+
+ def test_systable_metadata_log_entries = { table ->
+ def systableName = "${table}\$metadata_log_entries"
+ order_qt_desc_metadata_log_entries """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_metadata_log_entries_count """select count(*) from
${systableName}"""
+
+ List<List<Object>> res1 = sql """select * from ${systableName} order
by timestamp"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "metadata_log_entries") order by timestamp"""
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+ }
+
+ def test_systable_snapshots = { table ->
+ def systableName = "${table}\$snapshots"
+ order_qt_desc_snapshots """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_snapshots """select operation from ${systableName}"""
+ order_qt_select_snapshots_count """select count(*) from
${systableName}"""
+
+ List<List<Object>> res1 = sql """select * from ${systableName} order
by committed_at"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "snapshots") order by committed_at"""
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+ }
+
+ def test_systable_refs = { table ->
+ def systableName = "${table}\$refs"
+ order_qt_desc_refs """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_refs """select name, type from ${systableName}"""
+ order_qt_select_refs_count """select count(*) from ${systableName}"""
+
+ List<List<Object>> res1 = sql """select * from ${systableName} order
by snapshot_id"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "refs") order by snapshot_id"""
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+ }
+
+ def test_systable_manifests = { table ->
+ def systableName = "${table}\$manifests"
+ order_qt_desc_manifests """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_manifests_count """select count(*) from
${systableName}"""
+
+ List<List<Object>> res1 = sql """select * from ${systableName} order
by path"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "manifests") order by path"""
+ assertEquals(res1.size(), res2.size());
+ for (int i = 0; i < res1.size(); i++) {
+ for (int j = 0; j < res1[i].size(); j++) {
+ assertEquals(res1[i][j], res2[i][j]);
+ }
+ }
+ }
+
+ def test_systable_partitions = { table ->
+ def systableName = "${table}\$partitions"
+ order_qt_desc_partitions """desc ${systableName}"""
+
+ List<List<Object>> desc1 = sql """desc ${systableName}"""
+ List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+ List<List<Object>> desc3 = sql """desc
${catalog_name}.${db_name}.${systableName}"""
+ assertEquals(desc1.size(), desc2.size());
+ assertEquals(desc1.size(), desc3.size());
+ for (int i = 0; i < desc1.size(); i++) {
+ for (int j = 0; j < desc1[i].size(); j++) {
+ assertEquals(desc1[i][j], desc2[i][j]);
+ assertEquals(desc1[i][j], desc3[i][j]);
+ }
+ }
+
+ order_qt_select_partitions_count """select count(*) from
${systableName}"""
+
+
+ List<List<Object>> res1 = sql """select * from ${systableName};"""
+ List<List<Object>> res2 = sql """select * from iceberg_meta(
+ "table" = "${catalog_name}.${db_name}.${table}",
+ "query_type" = "partitions");"""
+ assertEquals(res1.size(), res2.size());
+ // just test can be selected successully
+ }
+
+ def test_table_systables = { table ->
+ test_systable_entries(table, "entries")
+ test_systable_entries(table, "all_entries")
+ test_systable_files(table, "files")
+ test_systable_files(table, "data_files")
+ test_systable_files(table, "delete_files")
+ test_systable_files(table, "all_files")
+ test_systable_files(table, "all_data_files")
+ test_systable_files(table, "all_delete_files")
+ test_systable_history(table)
+ test_systable_metadata_log_entries(table)
+ test_systable_snapshots(table)
+ test_systable_refs(table)
+ test_systable_manifests(table)
+ test_systable_partitions(table)
+ // TODO: these table will be supportted in future
+ // test_systable_all_manifests(table)
+ // test_systable_position_deletes(table)
+
+ test {
+ sql """select * from ${table}\$all_manifests"""
+ exception "SysTable all_manifests is not supported yet"
+ }
+ test {
+ sql """select * from ${table}\$position_deletes"""
+ exception "SysTable position_deletes is not supported yet"
+ }
+ }
+
+ test_table_systables("test_iceberg_systable_unpartitioned")
+ test_table_systables("test_iceberg_systable_partitioned")
+
+ sql """drop table if exists test_iceberg_systable_tbl1;"""
+ sql """create table test_iceberg_systable_tbl1 (id int);"""
+ sql """insert into test_iceberg_systable_tbl1 values(1);"""
+ sql """insert into test_iceberg_systable_tbl1 values(2);"""
+ sql """insert into test_iceberg_systable_tbl1 values(3);"""
+ sql """insert into test_iceberg_systable_tbl1 values(4);"""
+ sql """insert into test_iceberg_systable_tbl1 values(5);"""
String user = "test_iceberg_systable_user"
String pwd = 'C123_567p'
@@ -119,65 +390,4 @@ suite("test_iceberg_sys_table",
"p0,external,doris,external_docker,external_dock
sql """select committed_at, snapshot_id, parent_id, operation from
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
}
try_sql("DROP USER ${user}")
-
- // tbl2
- sql """ create table test_iceberg_systable_tbl2 (id int) """
- sql """insert into test_iceberg_systable_tbl2 values(2);"""
- qt_sql_tbl2 """select * from test_iceberg_systable_tbl2"""
-
- sql """ create table test_iceberg_systable_tbl3 (id int) """
- sql """insert into test_iceberg_systable_tbl3 values(3);"""
- qt_sql_tbl3 """select * from test_iceberg_systable_tbl3"""
-
- // drop db with tables
- test {
- sql """drop database ${db_name}"""
- exception """is not empty"""
- }
-
- // drop db froce with tables
- sql """drop database ${db_name} force"""
-
- // refresh catalog
- sql """refresh catalog ${catalog_name}"""
- // should be empty
- test {
- sql """show tables from ${db_name}"""
- exception "Unknown database"
- }
-
- // table should be deleted
- qt_test1 """
- select * from s3(
- "uri" =
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
- "s3.endpoint"="http://${externalEnvIp}:${minio_port}",
- "s3.access_key" = "admin",
- "s3.secret_key" = "password",
- "s3.region" = "us-east-1",
- "format" = "parquet",
- "use_path_style" = "true"
- )
- """
- qt_test2 """
- select * from s3(
- "uri" =
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
- "s3.endpoint"="http://${externalEnvIp}:${minio_port}",
- "s3.access_key" = "admin",
- "s3.secret_key" = "password",
- "s3.region" = "us-east-1",
- "format" = "parquet",
- "use_path_style" = "true"
- )
- """
- qt_test3 """
- select * from s3(
- "uri" =
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
- "s3.endpoint"="http://${externalEnvIp}:${minio_port}",
- "s3.access_key" = "admin",
- "s3.secret_key" = "password",
- "s3.region" = "us-east-1",
- "format" = "parquet",
- "use_path_style" = "true"
- )
- """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]