This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7713596e6b5 [feat][paimon] support paimon system table (#52598)
7713596e6b5 is described below
commit 7713596e6b51a0e43daabc92a1e891eafc051e7c
Author: Petrichor <[email protected]>
AuthorDate: Wed Jul 9 23:50:14 2025 +0800
[feat][paimon] support paimon system table (#52598)
## Overview
This PR adds support for querying Apache Paimon system tables, enabling
users to inspect metadata and operational information about Paimon
tables directly through SQL queries.
For complete system table reference and advanced usage patterns, see the
[Apache Paimon System Tables
Documentation](https://paimon.apache.org/docs/master/concepts/system-tables/).
## Usage Examples
### Basic System Table Query
```sql
-- Query snapshot information
SELECT * FROM my_table$snapshots;
```
```sql
-- Query manifest details
SELECT * FROM my_table$manifests;
```
```sql
-- Query schema evolution
SELECT * FROM my_table$schemas;
```
### Advanced Operations
```sql
-- Filter snapshots by commit time
SELECT snapshot_id, commit_user, commit_time
FROM my_table$snapshots
WHERE commit_time > '2025-01-01'
ORDER BY commit_time DESC;
```
```sql
-- Join snapshots with schemas
SELECT s.snapshot_id, sc.fields
FROM my_table$snapshots s
JOIN my_table$schemas sc ON s.schema_id = sc.schema_id;
```
Sample Output
```sql
mysql> SELECT * FROM my_table$snapshots;
+-------------+-----------+--------------------------------------+---------------------+-------------+-------------------------+------------------------------------------------------+------------------------------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
| snapshot_id | schema_id | commit_user |
commit_identifier | commit_kind | commit_time |
base_manifest_list | delta_manifest_list
| changelog_manifest_list | total_record_count |
delta_record_count | changelog_record_count | watermark |
+-------------+-----------+--------------------------------------+---------------------+-------------+-------------------------+------------------------------------------------------+------------------------------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
| 1 | 0 | d7ea4996-92c7-469f-b9ff-c76525954f1c |
9223372036854775807 | APPEND | 2025-03-04 22:48:45.575 |
manifest-list-dc5490ba-420c-445a-b6f7-6962d394935c-0 |
manifest-list-dc5490ba-420c-445a-b6f7-6962d394935c-1 | NULL
| 1 | 1 | 0 |
-9223372036854775808 |
+-------------+-----------+--------------------------------------+---------------------+-------------+-------------------------+------------------------------------------------------+------------------------------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
```
---
.../format/table/paimon_sys_table_jni_reader.cpp | 71 ++++++
.../format/table/paimon_sys_table_jni_reader.h | 65 +++++
be/src/vec/exec/scan/meta_scanner.cpp | 7 +
.../doris/paimon/PaimonSysTableJniScanner.java | 234 ++++++++++++++++++
.../org/apache/doris/paimon/PaimonTableCache.java | 43 +++-
.../doris/catalog/BuiltinTableValuedFunctions.java | 2 +
.../datasource/paimon/PaimonExternalCatalog.java | 27 +++
.../apache/doris/datasource/paimon/PaimonUtil.java | 34 ++-
.../datasource/paimon/source/PaimonScanNode.java | 35 +--
.../doris/datasource/systable/PaimonSysTable.java | 90 +++++++
.../datasource/systable/SupportedSysTables.java | 3 +-
.../expressions/functions/table/PaimonMeta.java | 70 ++++++
.../visitor/TableValuedFunctionVisitor.java | 5 +
.../tablefunction/PaimonTableValuedFunction.java | 199 ++++++++++++++++
.../doris/tablefunction/TableValuedFunctionIf.java | 2 +
gensrc/thrift/PlanNodes.thrift | 14 ++
gensrc/thrift/Types.thrift | 1 +
.../paimon/paimon_system_table.out | Bin 0 -> 5130 bytes
.../paimon/paimon_system_table.groovy | 264 +++++++++++++++++++++
19 files changed, 1132 insertions(+), 34 deletions(-)
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
new file mode 100644
index 00000000000..e6b0263ed25
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_sys_table_jni_reader.h"
+
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
+const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon.";
+
+PaimonSysTableJniReader::PaimonSysTableJniReader(
+ const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state,
+ RuntimeProfile* profile, const TPaimonMetadataParams& range_params)
+ : JniReader(file_slot_descs, state, profile),
_range_params(range_params) {
+ std::vector<std::string> required_fields;
+ std::vector<std::string> required_types;
+ for (const auto& desc : _file_slot_descs) {
+ required_fields.emplace_back(desc->col_name());
+
required_types.emplace_back(JniConnector::get_jni_type_with_different_string(desc->type()));
+ }
+
+ std::map<std::string, std::string> params;
+ params["db_name"] = _range_params.db_name;
+ params["tbl_name"] = _range_params.tbl_name;
+ params["query_type"] = _range_params.query_type;
+ params["ctl_id"] = std::to_string(_range_params.ctl_id);
+ params["db_id"] = std::to_string(_range_params.db_id);
+ params["tbl_id"] = std::to_string(_range_params.tbl_id);
+ params["serialized_split"] = _range_params.serialized_split;
+ params["required_fields"] = join(required_fields, ",");
+ params["required_types"] = join(required_types, "#");
+
+ for (const auto& kv : _range_params.paimon_props) {
+ params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
+ }
+
+ for (const auto& kv : _range_params.hadoop_props) {
+ params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+ }
+
+ _jni_connector = std::make_unique<JniConnector>(
+ "org/apache/doris/paimon/PaimonSysTableJniScanner",
std::move(params), required_fields);
+}
+
+Status PaimonSysTableJniReader::init_reader(
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ _colname_to_value_range = colname_to_value_range;
+ RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
new file mode 100644
index 00000000000..f7b3108f5af
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+
+#include <cstddef>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/jni_reader.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+class PaimonSysTableJniReader : public JniReader {
+ ENABLE_FACTORY_CREATOR(PaimonSysTableJniReader);
+
+public:
+ static const std::string HADOOP_OPTION_PREFIX;
+ static const std::string PAIMON_OPTION_PREFIX;
+ PaimonSysTableJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state, RuntimeProfile* profile,
+ const TPaimonMetadataParams& range_params);
+
+ ~PaimonSysTableJniReader() override = default;
+
+ Status init_reader(
+ const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+
+private:
+ const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ const TPaimonMetadataParams& _range_params;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/meta_scanner.cpp
b/be/src/vec/exec/scan/meta_scanner.cpp
index 843e09a4865..d64806b5e08 100644
--- a/be/src/vec/exec/scan/meta_scanner.cpp
+++ b/be/src/vec/exec/scan/meta_scanner.cpp
@@ -42,6 +42,7 @@
#include "vec/core/block.h"
#include "vec/core/types.h"
#include "vec/exec/format/table/iceberg_sys_table_jni_reader.h"
+#include "vec/exec/format/table/paimon_sys_table_jni_reader.h"
namespace doris {
class RuntimeProfile;
@@ -71,6 +72,12 @@ Status MetaScanner::open(RuntimeState* state) {
const std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
+ } else if (_scan_range.meta_scan_range.metadata_type ==
TMetadataType::PAIMON) {
+ auto reader = PaimonSysTableJniReader::create_unique(
+ _tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range.paimon_params);
+ const std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
+ RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
+ _reader = std::move(reader);
} else {
RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
new file mode 100644
index 00000000000..0344c8b2db4
--- /dev/null
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.paimon;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
+import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
+import org.apache.doris.paimon.PaimonTableCache.TableExt;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.TimestampType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * JNI-based scanner for reading Apache Paimon system tables
+ */
+public class PaimonSysTableJniScanner extends JniScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonSysTableJniScanner.class);
+
+ private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+ private static final String PAIMON_OPTION_PREFIX = "paimon.";
+
+ private final Map<String, String> params;
+ private final Map<String, String> hadoopOptionParams;
+ private final Map<String, String> paimonOptionParams;
+
+ private final ClassLoader classLoader;
+ private final Split paimonSplit;
+ private Table table;
+ private RecordReader<InternalRow> reader;
+ private final PaimonColumnValue columnValue = new PaimonColumnValue();
+ private List<DataType> paimonDataTypeList;
+ private List<String> paimonAllFieldNames;
+ private final PreExecutionAuthenticator preExecutionAuthenticator;
+ private RecordReader.RecordIterator<InternalRow> recordIterator = null;
+ private final long ctlId;
+ private final long dbId;
+ private final long tblId;
+ private final String dbName;
+ private final String tblName;
+ private final String queryType;
+
+ /**
+ * Constructs a new PaimonSysTableJniScanner for reading Paimon system
tables.
+ */
+ public PaimonSysTableJniScanner(int batchSize, Map<String, String> params)
{
+ this.classLoader = this.getClass().getClassLoader();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("params:{}", params);
+ }
+ this.params = params;
+ String[] requiredFields = params.get("required_fields").split(",");
+ String[] requiredTypes = params.get("required_types").split("#");
+ ColumnType[] columnTypes = new ColumnType[requiredTypes.length];
+ for (int i = 0; i < requiredTypes.length; i++) {
+ columnTypes[i] = ColumnType.parseType(requiredFields[i],
requiredTypes[i]);
+ }
+ initTableInfo(columnTypes, requiredFields, batchSize);
+ this.paimonSplit =
PaimonUtils.deserialize(params.get("serialized_split"));
+ this.ctlId = Long.parseLong(params.get("ctl_id"));
+ this.dbId = Long.parseLong(params.get("db_id"));
+ this.tblId = Long.parseLong(params.get("tbl_id"));
+ this.dbName = params.get("db_name");
+ this.tblName = params.get("tbl_name");
+ this.queryType = params.get("query_type");
+ this.hadoopOptionParams = params.entrySet().stream()
+ .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+ .collect(Collectors
+ .toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
+ Entry::getValue));
+ this.paimonOptionParams = params.entrySet().stream()
+ .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
+ .collect(Collectors
+ .toMap(kv1 ->
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()),
+ Entry::getValue));
+ this.preExecutionAuthenticator =
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
+ }
+
+ @Override
+ public void open() {
+ try {
+ // When the user does not specify hive-site.xml, Paimon will look
for the file from the classpath:
+ // org.apache.paimon.hive.HiveCatalog.createHiveConf:
+ //
`Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
+ // so we need to provide a classloader, otherwise it will cause
NPE.
+ Thread.currentThread().setContextClassLoader(classLoader);
+ preExecutionAuthenticator.execute(() -> {
+ initTable();
+ initReader();
+ return null;
+ });
+ resetDatetimeV2Precision();
+
+ } catch (Throwable e) {
+ LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ @Override
+ protected int getNext() throws IOException {
+ try {
+ return
preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initTable() {
+ PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
+ paimonOptionParams, hadoopOptionParams, dbName, tblName,
queryType);
+ TableExt tableExt = PaimonTableCache.getTable(key);
+ Table paimonTable = tableExt.getTable();
+ if (paimonTable == null) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to get Paimon system table
{%s}.{%s}${%s}. ",
+ dbName, tblName, queryType));
+ }
+ this.table = paimonTable;
+ this.paimonAllFieldNames =
PaimonUtils.getFieldNames(this.table.rowType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
+ }
+ }
+
+ private void initReader() throws IOException {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ if (this.fields.length > this.paimonAllFieldNames.size()) {
+ throw new IOException(
+ String.format(
+ "The jni reader fields' size {%s} is not matched
with paimon fields' size {%s}."
+ + " Please refresh table and try again",
+ fields.length, paimonAllFieldNames.size()));
+ }
+ int[] projected = getProjected();
+ readBuilder.withProjection(projected);
+ reader =
readBuilder.newRead().executeFilter().createReader(paimonSplit);
+ paimonDataTypeList =
+ Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i)).collect(Collectors.toList());
+ }
+
+ private int[] getProjected() {
+ return
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
+ }
+
+ private void resetDatetimeV2Precision() {
+ for (int i = 0; i < types.length; i++) {
+ if (types[i].isDateTimeV2()) {
+ // paimon support precision > 6, but it has been reset as 6 in
FE
+ // try to get the right precision for datetimev2
+ int index = paimonAllFieldNames.indexOf(fields[i]);
+ if (index != -1) {
+ DataType dataType = table.rowType().getTypeAt(index);
+ if (dataType instanceof TimestampType) {
+ types[i].setPrecision(((TimestampType)
dataType).getPrecision());
+ }
+ }
+ }
+ }
+ }
+
+ private int readAndProcessNextBatch() throws IOException {
+ int rows = 0;
+ try {
+ if (recordIterator == null) {
+ recordIterator = reader.readBatch();
+ }
+
+ while (recordIterator != null) {
+ InternalRow record;
+ while ((record = recordIterator.next()) != null) {
+ columnValue.setOffsetRow(record);
+ for (int i = 0; i < fields.length; i++) {
+ columnValue.setIdx(i, types[i],
paimonDataTypeList.get(i));
+ long l = System.nanoTime();
+ appendData(i, columnValue);
+ appendDataTime += System.nanoTime() - l;
+ }
+ rows++;
+ if (rows >= batchSize) {
+ return rows;
+ }
+ }
+ recordIterator.releaseBatch();
+ recordIterator = reader.readBatch();
+ }
+ } catch (Exception e) {
+ close();
+ LOG.warn("Failed to get the next batch of paimon. "
+ + "split: {}, requiredFieldNames: {},
paimonAllFieldNames: {}, dataType: {}",
+ paimonSplit, params.get("required_fields"),
paimonAllFieldNames, paimonDataTypeList, e);
+ throw new IOException(e);
+ }
+ return rows;
+ }
+}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
index 12aac153392..e5f067af96b 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
@@ -36,12 +36,13 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PaimonTableCache {
- private static final Logger LOG =
LoggerFactory.getLogger(PaimonTableCache.class);
// Max cache num of paimon table
public static final long max_external_schema_cache_num = 50;
// The expiration time of a cache object after last access of it.
public static final long external_cache_expire_time_minutes_after_access =
100;
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonTableCache.class);
+
private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache =
CacheBuilder.newBuilder()
.maximumSize(max_external_schema_cache_num)
.expireAfterAccess(external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
@@ -56,7 +57,13 @@ public class PaimonTableCache {
try {
LOG.warn("load table:{}", key);
Catalog catalog = createCatalog(key.getPaimonOptionParams(),
key.getHadoopOptionParams());
- Table table = catalog.getTable(Identifier.create(key.getDbName(),
key.getTblName()));
+ Table table;
+ if (key.getQueryType() != null) {
+ table = catalog.getTable(new Identifier(key.getDbName(),
key.getTblName(),
+ null, key.getQueryType()));
+ } else {
+ table = catalog.getTable(Identifier.create(key.getDbName(),
key.getTblName()));
+ }
return new TableExt(table, System.currentTimeMillis());
} catch (Catalog.TableNotExistException e) {
LOG.warn("failed to create paimon table ", e);
@@ -107,15 +114,16 @@ public class PaimonTableCache {
public static class PaimonTableCacheKey {
// in key
- private long ctlId;
- private long dbId;
- private long tblId;
+ private final long ctlId;
+ private final long dbId;
+ private final long tblId;
// not in key
private Map<String, String> paimonOptionParams;
private Map<String, String> hadoopOptionParams;
private String dbName;
private String tblName;
+ private String queryType;
public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
Map<String, String> paimonOptionParams,
@@ -130,6 +138,20 @@ public class PaimonTableCache {
this.tblName = tblName;
}
+ public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
+ Map<String, String> paimonOptionParams,
+ Map<String, String> hadoopOptionParams,
+ String dbName, String tblName, String queryType) {
+ this.ctlId = ctlId;
+ this.dbId = dbId;
+ this.tblId = tblId;
+ this.paimonOptionParams = paimonOptionParams;
+ this.hadoopOptionParams = hadoopOptionParams;
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.queryType = queryType;
+ }
+
public long getCtlId() {
return ctlId;
}
@@ -158,6 +180,10 @@ public class PaimonTableCache {
return tblName;
}
+ public String getQueryType() {
+ return queryType;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -167,9 +193,9 @@ public class PaimonTableCache {
return false;
}
PaimonTableCacheKey that = (PaimonTableCacheKey) o;
- return ctlId == that.ctlId
- && dbId == that.dbId
- && tblId == that.tblId;
+ return ctlId == that.ctlId && dbId == that.dbId && tblId ==
that.tblId && Objects.equal(
+ queryType,
+ that.queryType);
}
@Override
@@ -187,6 +213,7 @@ public class PaimonTableCache {
+ ", hadoopOptionParams=" + hadoopOptionParams
+ ", dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ + ", queryType='" + queryType + '\''
+ '}';
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index 97239e1191d..d4bdf92696b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -31,6 +31,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta;
import
org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues;
import org.apache.doris.nereids.trees.expressions.functions.table.Partitions;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
@@ -55,6 +56,7 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(Local.class, "local"),
tableValued(HudiMeta.class, "hudi_meta"),
tableValued(IcebergMeta.class, "iceberg_meta"),
+ tableValued(PaimonMeta.class, "paimon_meta"),
tableValued(Hdfs.class, "hdfs"),
tableValued(HttpStream.class, "http_stream"),
tableValued(Numbers.class, "numbers"),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
index 7483ef2e477..7e73639b6dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -78,6 +79,15 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
authConf = AuthenticationConfig.getKerberosConfig(conf);
hadoopAuthenticator =
HadoopAuthenticator.getHadoopAuthenticator(authConf);
+ initPreExecutionAuthenticator();
+ }
+
+ @Override
+ protected synchronized void initPreExecutionAuthenticator() {
+ if (preExecutionAuthenticator == null) {
+ preExecutionAuthenticator = new PreExecutionAuthenticator();
+
preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
+ }
}
public String getCatalogType() {
@@ -161,6 +171,23 @@ public abstract class PaimonExternalCatalog extends
ExternalCatalog {
}
}
+ public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping, String queryType) {
+ return getPaimonSystemTable(nameMapping, null, queryType);
+ }
+
+ public org.apache.paimon.table.Table getPaimonSystemTable(NameMapping
nameMapping, String branch,
+ String queryType) {
+ makeSureInitialized();
+ try {
+ return hadoopAuthenticator.doAs(() -> catalog.getTable(new
Identifier(nameMapping.getRemoteDbName(),
+ nameMapping.getRemoteTblName(), branch, queryType)));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get Paimon system table:" +
getName() + "."
+ + nameMapping.getRemoteDbName() + "." +
nameMapping.getRemoteTblName() + "$" + queryType
+ + ", because " + e.getMessage(), e);
+ }
+ }
+
protected String getPaimonCatalogType(String catalogType) {
if (PAIMON_HMS.equalsIgnoreCase(catalogType)) {
return PaimonProperties.PAIMON_HMS_CATALOG;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 2d9218a02ff..a0ede02a32a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -59,11 +59,13 @@ import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -72,6 +74,7 @@ import javax.annotation.Nullable;
public class PaimonUtil {
private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
+ private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate
predicate,
@@ -104,7 +107,7 @@ public class PaimonUtil {
}
public static PaimonPartitionInfo generatePartitionInfo(List<Column>
partitionColumns,
- List<Partition>
paimonPartitions) {
+ List<Partition> paimonPartitions) {
if (CollectionUtils.isEmpty(partitionColumns) ||
paimonPartitions.isEmpty()) {
return PaimonPartitionInfo.EMPTY;
@@ -339,4 +342,33 @@ public class PaimonUtil {
return tSchema;
}
+ public static List<Column> parseSchema(Table table) {
+ List<String> primaryKeys = table.primaryKeys();
+ return parseSchema(table.rowType(), primaryKeys);
+ }
+
+ public static List<Column> parseSchema(RowType rowType, List<String>
primaryKeys) {
+ List<Column> resSchema =
Lists.newArrayListWithCapacity(rowType.getFields().size());
+ rowType.getFields().forEach(field -> {
+ resSchema.add(new Column(field.name().toLowerCase(),
+ PaimonUtil.paimonTypeToDorisType(field.type()),
+ primaryKeys.contains(field.name()),
+ null,
+ field.type().isNullable(),
+ field.description(),
+ true,
+ field.id()));
+ });
+ return resSchema;
+ }
+
+ public static <T> String encodeObjectToString(T t) {
+ try {
+ byte[] bytes = InstantiationUtil.serializeObject(t);
+ return new String(BASE64_ENCODER.encode(bytes),
java.nio.charset.StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 21474a164d0..0d0bf8d3b32 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -56,11 +56,9 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.utils.InstantiationUtil;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -151,7 +149,7 @@ public class PaimonScanNode extends FileQueryScanNode {
protected void doInitialize() throws UserException {
super.doInitialize();
source = new PaimonSource(desc);
- serializedTable = encodeObjectToString(source.getPaimonTable());
+ serializedTable =
PaimonUtil.encodeObjectToString(source.getPaimonTable());
// Todo: Get the current schema id of the table, instead of using -1.
ExternalUtil.initSchemaInfo(params, -1L,
source.getTargetTable().getColumns());
}
@@ -168,17 +166,6 @@ public class PaimonScanNode extends FileQueryScanNode {
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
}
- private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
-
- public static <T> String encodeObjectToString(T t) {
- try {
- byte[] bytes = InstantiationUtil.serializeObject(t);
- return new String(BASE64_ENCODER.encode(bytes),
java.nio.charset.StandardCharsets.UTF_8);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof PaimonSplit) {
@@ -195,7 +182,7 @@ public class PaimonScanNode extends FileQueryScanNode {
if (currentQuerySchema.putIfAbsent(schemaId, Boolean.TRUE) == null) {
PaimonExternalTable table = (PaimonExternalTable)
source.getTargetTable();
TableSchema tableSchema =
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
-
.getPaimonSchemaCacheValue(table.getOrBuildNameMapping(),
schemaId).getTableSchema();
+ .getPaimonSchemaCacheValue(table.getOrBuildNameMapping(),
schemaId).getTableSchema();
params.addToHistorySchemaInfo(PaimonUtil.getSchemaInfo(tableSchema));
}
}
@@ -210,7 +197,7 @@ public class PaimonScanNode extends FileQueryScanNode {
if (split != null) {
// use jni reader
rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
- fileDesc.setPaimonSplit(encodeObjectToString(split));
+ fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split));
rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
} else {
// use native reader
@@ -226,7 +213,7 @@ public class PaimonScanNode extends FileQueryScanNode {
fileDesc.setSchemaId(paimonSplit.getSchemaId());
}
fileDesc.setFileFormat(fileFormat);
- fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
+
fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
.collect(Collectors.joining(",")));
fileDesc.setDbName(((PaimonExternalTable)
source.getTargetTable()).getDbName());
@@ -380,13 +367,13 @@ public class PaimonScanNode extends FileQueryScanNode {
return Collections.emptyList();
}
int[] projected = desc.getSlots().stream().mapToInt(
- slot -> source.getPaimonTable().rowType()
- .getFieldNames()
- .stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList())
- .indexOf(slot.getColumn().getName()))
- .toArray();
+ slot -> source.getPaimonTable().rowType()
+ .getFieldNames()
+ .stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList())
+ .indexOf(slot.getColumn().getName()))
+ .toArray();
Table paimonTable = source.getPaimonTable();
Map<String, String> incrReadParams = getIncrReadParams();
paimonTable = paimonTable.copy(incrReadParams);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java
new file mode 100644
index 00000000000..68d98aead5f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/PaimonSysTable.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.systable;
+
+import org.apache.doris.analysis.TableValuedFunctionRef;
+import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta;
+import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
+import org.apache.doris.tablefunction.PaimonTableValuedFunction;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * System table implementation for Paimon metadata tables.
+ */
+public class PaimonSysTable extends SysTable {
+ private static final Logger LOG =
LogManager.getLogger(PaimonSysTable.class);
+
+ private static final Set<String> EXCLUDED_SYS_TABLES = new
HashSet<>(Arrays.asList("binlog", "ro", "audit_log"));
+
+ private static final List<PaimonSysTable> SUPPORTED_PAIMON_SYS_TABLES =
SystemTableLoader.SYSTEM_TABLES
+ .stream()
+ .filter(table -> !EXCLUDED_SYS_TABLES.contains(table))
+ .map(PaimonSysTable::new)
+ .collect(Collectors.toList());
+
+ private final String tableName;
+
+ /**
+ * Creates a new Paimon system table instance.
+ *
+ * @param tableName the name of the system table
+ */
+ protected PaimonSysTable(String tableName) {
+ super(tableName, "paimon_meta");
+ this.tableName = tableName;
+ }
+
+ public static List<PaimonSysTable> getSupportedPaimonSysTables() {
+ return SUPPORTED_PAIMON_SYS_TABLES;
+ }
+
+ @Override
+ public TableValuedFunction createFunction(String ctlName, String dbName,
String sourceNameWithMetaName) {
+ List<String> nameParts = Lists.newArrayList(ctlName, dbName,
+ getSourceTableName(sourceNameWithMetaName));
+ return PaimonMeta.createPaimonMeta(nameParts, tableName);
+ }
+
+ @Override
+ public TableValuedFunctionRef createFunctionRef(String ctlName, String
dbName, String sourceNameWithMetaName) {
+ List<String> nameParts = Lists.newArrayList(ctlName, dbName,
+ getSourceTableName(sourceNameWithMetaName));
+ Map<String, String> params = Maps.newHashMap();
+ params.put(PaimonTableValuedFunction.TABLE,
Joiner.on(".").join(nameParts));
+ params.put(PaimonTableValuedFunction.QUERY_TYPE, tableName);
+ try {
+ return new TableValuedFunctionRef(tvfName, null, params);
+ } catch (org.apache.doris.common.AnalysisException e) {
+ LOG.warn("should not happen. {}.{}.{}", ctlName, dbName,
sourceNameWithMetaName, e);
+ return null;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
index 9b471030d45..fc635113a1d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
@@ -36,7 +36,8 @@ public class SupportedSysTables {
ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList(
IcebergSysTable.getSupportedIcebergSysTables());
// paimon
- PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList();
+ PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList(
+ PaimonSysTable.getSupportedPaimonSysTables());
// hudi
HUDI_SUPPORTED_SYS_TABLES = Lists.newArrayList();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java
new file mode 100644
index 00000000000..9c0ac35af41
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PaimonMeta.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.PaimonTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table-valued function for accessing Paimon metadata tables.
+ */
+public class PaimonMeta extends TableValuedFunction {
+
+ public PaimonMeta(Properties properties) {
+ super("paimon_meta", properties);
+ }
+
+ public static PaimonMeta createPaimonMeta(List<String> nameParts, String
queryType) {
+ Map<String, String> prop = Maps.newHashMap();
+ prop.put(PaimonTableValuedFunction.TABLE,
Joiner.on(".").join(nameParts));
+ prop.put(PaimonTableValuedFunction.QUERY_TYPE, queryType);
+ return new PaimonMeta(new Properties(prop));
+ }
+
+ @Override
+ protected TableValuedFunctionIf toCatalogFunction() {
+ try {
+ Map<String, String> arguments = getTVFProperties().getMap();
+ return PaimonTableValuedFunction.create(arguments);
+ } catch (Throwable t) {
+ throw new AnalysisException("Can not build
PaimonTableValuedFunction by "
+ + this + ": " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitPaimonMeta(this, context);
+ }
+
+ @Override
+ public FunctionSignature customSignature() {
+ return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 311a6e83a85..a1cd65a8e8f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -31,6 +31,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
+import org.apache.doris.nereids.trees.expressions.functions.table.PaimonMeta;
import
org.apache.doris.nereids.trees.expressions.functions.table.PartitionValues;
import org.apache.doris.nereids.trees.expressions.functions.table.Partitions;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
@@ -102,6 +103,10 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(icebergMeta, context);
}
+ default R visitPaimonMeta(PaimonMeta paimonMeta, C context) {
+ return visitTableValuedFunction(paimonMeta, context);
+ }
+
default R visitLocal(Local local, C context) {
return visitTableValuedFunction(local, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
new file mode 100644
index 00000000000..0c44800b943
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
+import org.apache.doris.datasource.paimon.PaimonUtil;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+import org.apache.doris.thrift.TPaimonMetadataParams;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Table-valued function for querying Paimon system tables metadata.
+ */
+public class PaimonTableValuedFunction extends MetadataTableValuedFunction {
+ public static final String NAME = "paimon_meta";
+ public static final String TABLE = "table";
+ public static final String QUERY_TYPE = "query_type";
+
+ private static final ImmutableSet<String> PROPERTIES_SET =
ImmutableSet.of(TABLE, QUERY_TYPE);
+
+ private final String queryType;
+ private final Table paimonSysTable;
+ private final List<Column> schema;
+ private final Map<String, String> hadoopProps;
+ private final Map<String, String> paimonProps;
+ private final HadoopAuthenticator hadoopAuthenticator;
+ private final TableName paimonTableName;
+ private final long ctlId;
+ private final long dbId;
+ private final long tblId;
+
+ /**
+ * Creates a new Paimon table-valued function instance.
+ *
+ * @param paimonTableName the target Paimon table name
+ * @param queryType the type of metadata query to perform
+ * @throws AnalysisException if table validation or initialization fails
+ */
+ public PaimonTableValuedFunction(TableName paimonTableName, String
queryType) throws AnalysisException {
+ this.queryType = queryType;
+ CatalogIf<?> dorisCatalog = Env.getCurrentEnv()
+ .getCatalogMgr()
+ .getCatalog(paimonTableName.getCtl());
+
+ if (!(dorisCatalog instanceof PaimonExternalCatalog)) {
+ throw new AnalysisException("Catalog " + paimonTableName.getCtl()
+ " is not an paimon catalog");
+ }
+
+ this.paimonTableName = paimonTableName;
+ PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog)
dorisCatalog;
+ this.hadoopProps =
paimonExternalCatalog.getCatalogProperty().getHadoopProperties();
+ this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap();
+ this.hadoopAuthenticator =
paimonExternalCatalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
+ this.ctlId = paimonExternalCatalog.getId();
+
+ ExternalDatabase<? extends ExternalTable> database =
paimonExternalCatalog.getDb(paimonTableName.getDb())
+ .orElseThrow(() -> new AnalysisException(
+ String.format("Paimon catalog database '%s' does not
exist", paimonTableName.getDb())
+ ));
+ this.dbId = database.getId();
+
+ ExternalTable externalTable =
database.getTable(paimonTableName.getTbl())
+ .orElseThrow(() -> new AnalysisException(
+ String.format("Paimon catalog table '%s.%s' does not
exist",
+ paimonTableName.getDb(),
paimonTableName.getTbl())
+ ));
+ NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
+ this.tblId = externalTable.getId();
+
+ this.paimonSysTable =
paimonExternalCatalog.getPaimonSystemTable(buildNameMapping,
+ queryType);
+ this.schema = PaimonUtil.parseSchema(paimonSysTable);
+
+ }
+
+ public static PaimonTableValuedFunction create(Map<String, String> params)
throws AnalysisException {
+ Map<String, String> validParams = Maps.newHashMap();
+ for (String key : params.keySet()) {
+ if (!PROPERTIES_SET.contains(key.toLowerCase())) {
+ throw new AnalysisException("'" + key + "' is invalid
property");
+ }
+ // check ctl, db, tbl
+ validParams.put(key.toLowerCase(), params.get(key));
+ }
+
+ String tableName = validParams.get(TABLE);
+ String queryType = validParams.get(QUERY_TYPE);
+ if (tableName == null || queryType == null) {
+ throw new AnalysisException("Invalid paimon metadata query");
+ }
+
+ String[] names = tableName.split("\\.");
+ if (names.length != 3) {
+ throw new AnalysisException("The paimon table name contains the
catalogName, databaseName, and tableName");
+ }
+ TableName paimonTableName = new TableName(names[0], names[1],
names[2]);
+ // check auth
+ if (!Env.getCurrentEnv().getAccessManager()
+ .checkTblPriv(ConnectContext.get(), paimonTableName,
PrivPredicate.SELECT)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SELECT",
+ ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
+ paimonTableName.getDb() + ": " + paimonTableName.getTbl());
+ }
+ return new PaimonTableValuedFunction(paimonTableName, queryType);
+ }
+
+ @Override
+ public TMetadataType getMetadataType() {
+ return TMetadataType.PAIMON;
+ }
+
+ @Override
+ public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ int[] projections = requiredFileds.stream().mapToInt(
+ field -> paimonSysTable.rowType().getFieldNames()
+ .stream()
+ .map(String::toLowerCase)
+ .collect(Collectors.toList())
+ .indexOf(field))
+ .toArray();
+ List<Split> splits;
+
+ try {
+ splits = hadoopAuthenticator.doAs(
+ () ->
paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits());
+ } catch (Exception e) {
+ throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
+ }
+
+ return
splits.stream().map(this::createMetaScanRange).collect(Collectors.toList());
+ }
+
+ @Override
+ public String getTableName() {
+ return "PaimonTableValuedFunction<" + queryType + ">";
+ }
+
+ @Override
+ public List<Column> getTableColumns() throws AnalysisException {
+ return schema;
+ }
+
+ private TMetaScanRange createMetaScanRange(Split split) {
+ TMetaScanRange tMetaScanRange = new TMetaScanRange();
+ tMetaScanRange.setMetadataType(TMetadataType.PAIMON);
+
+ TPaimonMetadataParams tPaimonMetadataParams = new
TPaimonMetadataParams();
+ tPaimonMetadataParams.setCtlId(ctlId);
+ tPaimonMetadataParams.setDbId(dbId);
+ tPaimonMetadataParams.setTblId(tblId);
+ tPaimonMetadataParams.setQueryType(queryType);
+ tPaimonMetadataParams.setDbName(paimonTableName.getDb());
+ tPaimonMetadataParams.setTblName(paimonTableName.getTbl());
+ tPaimonMetadataParams.setHadoopProps(hadoopProps);
+ tPaimonMetadataParams.setPaimonProps(paimonProps);
+
tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split));
+
+ tMetaScanRange.setPaimonParams(tPaimonMetadataParams);
+ return tMetaScanRange;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index b57e322bb60..2771189329c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -60,6 +60,8 @@ public abstract class TableValuedFunctionIf {
return IcebergTableValuedFunction.create(params);
case HudiTableValuedFunction.NAME:
return new HudiTableValuedFunction(params);
+ case PaimonTableValuedFunction.NAME:
+ return PaimonTableValuedFunction.create(params);
case BackendsTableValuedFunction.NAME:
return new BackendsTableValuedFunction(params);
case FrontendsTableValuedFunction.NAME:
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4ab2fa2881b..ea86689f9f2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -540,6 +540,19 @@ struct TIcebergMetadataParams {
2: optional map<string, string> hadoop_props
}
+
+struct TPaimonMetadataParams {
+ 1: optional string db_name
+ 2: optional string tbl_name
+ 3: optional string query_type
+ 4: optional i64 ctl_id
+ 5: optional i64 db_id
+ 6: optional i64 tbl_id
+ 7: optional string serialized_split
+ 8: optional map<string, string> hadoop_props
+ 9: optional map<string, string> paimon_props
+}
+
struct THudiMetadataParams {
1: optional Types.THudiQueryType hudi_query_type
2: optional string catalog
@@ -608,6 +621,7 @@ struct TMetaScanRange {
10: optional TMetaCacheStatsParams meta_cache_stats_params
11: optional TPartitionValuesMetadataParams partition_values_params
12: optional THudiMetadataParams hudi_params
+ 13: optional TPaimonMetadataParams paimon_params
}
// Specification of an individual data range which is held in its entirety
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index deb34ea91dc..4fe2e1ab082 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -748,6 +748,7 @@ enum TMetadataType {
PARTITIONS = 9,
PARTITION_VALUES = 10,
HUDI = 11,
+ PAIMON = 12,
}
enum THudiQueryType {
diff --git
a/regression-test/data/external_table_p0/paimon/paimon_system_table.out
b/regression-test/data/external_table_p0/paimon/paimon_system_table.out
new file mode 100644
index 00000000000..3bfc38f8bd1
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/paimon_system_table.out differ
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy
new file mode 100644
index 00000000000..57fb6c45f10
--- /dev/null
+++ b/regression-test/suites/external_table_p0/paimon/paimon_system_table.groovy
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("paimon_system_table",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disabled paimon test")
+ return
+ }
+
+ def validateQueryResults = { List<List<Object>> result1,
List<List<Object>> result2, String tableType ->
+ logger.info("${tableType} - Direct query result size:
${result1.size()}")
+ logger.info("${tableType} - Meta query result size: ${result2.size()}")
+ assertEquals(result1.size(), result2.size(), tableType + " query size
mismatch")
+
+ for (int i = 0; i < result1.size(); i++) {
+ List<Object> row1 = result1.get(i)
+ List<Object> row2 = result2.get(i)
+
+ for (int j = 0; j < row1.size(); j++) {
+ assertEquals(row1.get(j), row2.get(j),
+ String.format("%s data mismatch at [%d][%d]",
tableType, i, j))
+ }
+ }
+
+ logger.info(tableType + " validation passed: " + result1.size() + "
rows verified")
+ }
+ String catalog_name = "paimon_timestamp_types"
+ try {
+
+ String db_name = "flink_paimon"
+ String tableName = "ts_scale_orc"
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='paimon',
+ 'warehouse' = 's3://warehouse/wh/',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """use ${db_name};"""
+ logger.info("use " + db_name)
+
+ // 1. test Paimon data system table
+ logger.info("query data from paimon system table")
+ List<List<Object>> paimonTableList = sql """ show tables; """
+ boolean targetTableExists = paimonTableList.any { row ->
+ row.size() > 0 && row[0].toString().equals(tableName)
+ }
+ assertTrue(targetTableExists, "Target table '${tableName}' not found
in database '${db_name}'")
+
+ // test all paimon system table
+ List<String> paimonSystemTableList = new
ArrayList<>(Arrays.asList("manifests", "snapshots", "options", "schemas",
+ "partitions", "buckets", "files", "tags", "branches",
"consumers", "aggregation_fields",
+ "statistics", "table_indexes"))
+
+
+ // Iterate through all system tables for consistency testing
+ for (String systemTable : paimonSystemTableList) {
+ logger.info("Testing system table: " + systemTable)
+
+ // Direct query on system table
+ List<List<Object>> directQuery = sql """select * from
${tableName}\$${systemTable}"""
+
+ // // Query through paimon_meta function
+ List<List<Object>> metaQuery = sql """select * from paimon_meta(
+ "table" = "${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "${systemTable}");
+ """
+
+ validateQueryResults(directQuery, metaQuery, systemTable)
+ }
+
+ // 2 Verify system table projection and predicate functionality
+ // 2.1 Column projection tests
+ qt_direct_query__snapshots_result """select * from
${tableName}\$snapshots order by snapshot_id"""
+ qt_meta_query__snapshots_result """select * from paimon_meta(
+ "table" = "${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots") order by snapshot_id;
+ """
+ // column name
+ qt_paimon_snapshots_core_fields_direct_query """
+ select snapshot_id,
+ schema_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from ${tableName}\$snapshots
+ order by snapshot_id;
+ """
+ qt_paimon_snapshots_core_fields_meta_query """
+ select snapshot_id,
+ schema_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from paimon_meta(
+ "table" = "${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots")
+ order by snapshot_id;
+ """
+
+ qt_paimon_snapshots_reordered_filed_direct_query """
+ select schema_id,
+ snapshot_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from
${tableName}\$snapshots
+ order by snapshot_id;
+ """
+
+ qt_paimon_snapshots_reordered_filed_meta_query """
+ select schema_id,
+ snapshot_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from paimon_meta(
+ "table" =
"${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots")
+ order by snapshot_id;
+ """
+ // 2.2 Predicate filtering tests
+ List<List<Object>> res1 = sql """ select snapshot_id from
${tableName}\$snapshots order by snapshot_id;"""
+ List<List<Object>> res2 = sql """ select snapshot_id from paimon_meta(
+ "table" =
"${catalog_name}.${db_name}.${tableName}",
+ "query_type" =
"snapshots")
+ order by snapshot_id;
+ """
+
+ qt_snapshot_id_direct_query """select snapshot_id from
${tableName}\$snapshots order by snapshot_id;
+ """
+ qt_snapshot_id_meta_query """
+ select snapshot_id from paimon_meta(
+ "table" =
"${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots")
+ order by snapshot_id;
+ """
+
+ assertTrue(res1.size() > 0, "Direct query should return data")
+ assertTrue(res2.size() > 0, "Meta query should return data")
+ String direct_query_snapshot_id = String.valueOf(res1[0][0]);
+ String meta_query_snapshot_id = String.valueOf(res1[0][0]);
+ logger.info("snapshot_id=" + direct_query_snapshot_id)
+ qt_direct_query_snapshot_id_predicate """select schema_id,
+ snapshot_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from
${tableName}\$snapshots
+ where
snapshot_id=${direct_query_snapshot_id}
+ order by snapshot_id;
+
+ """
+
+ qt_meta_query_snapshot_id_predicate """select schema_id,
+ snapshot_id,
+ commit_user,
+ commit_identifier,
+ commit_kind,
+ base_manifest_list,
+ delta_manifest_list,
+ changelog_manifest_list,
+ total_record_count,
+ delta_record_count,
+ changelog_record_count from
paimon_meta(
+ "table" =
"${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots")
+ where
snapshot_id=${meta_query_snapshot_id}
+ order by snapshot_id;
+
+ """
+
+ //2.3 Aggregation functions
+ qt_direct_query_snapshot_id_count """
+ select count(*) from ${tableName}\$snapshots
+ where snapshot_id=${direct_query_snapshot_id}
+ """
+ qt_meta_query_snapshot_id_count """
+ select count(*) from paimon_meta(
+ "table" =
"${catalog_name}.${db_name}.${tableName}",
+ "query_type" = "snapshots")
+ where snapshot_id=${meta_query_snapshot_id}
+ """
+ //2.4 Join operations between system tables
+ qt_direct_query_snapshots_join """
+ SELECT s.snapshot_id, t.schema_id, t.fields
+ FROM ${tableName}\$snapshots s JOIN ${tableName}\$schemas t
+ ON s.schema_id=t.schema_id where
s.snapshot_id=${direct_query_snapshot_id};
+ """
+
+ //2.5 Table description queries
+ qt_desc_direct_query_ctl_db_table """
+ desc ${catalog_name}.${db_name}.${tableName}\$snapshots
+ """
+ qt_desc_direct_query_db_table """
+ desc ${db_name}.${tableName}\$snapshots
+ """
+
+ qt_desc_direct_query_table """
+ desc ${tableName}\$snapshots
+ """
+
+ } catch (Exception e) {
+ logger.error("Paimon system table test failed: " + e.getMessage())
+ throw e
+ } finally {
+ // clean resource
+ try {
+ sql """drop catalog if exists ${catalog_name}"""
+ } catch (Exception e) {
+ logger.warn("Failed to cleanup catalog: " + e.getMessage())
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]