This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 8d9bc866bd8 branch-3.1: [refactor](paimon) clean unused
PaimonJniScanner code and refactor PaimonSysJniScanner (#54679 #54804) (#55020)
8d9bc866bd8 is described below
commit 8d9bc866bd86da4e3d7afd329130453004e6b184
Author: Socrates <[email protected]>
AuthorDate: Thu Aug 21 12:03:54 2025 +0800
branch-3.1: [refactor](paimon) clean unused PaimonJniScanner code and
refactor PaimonSysJniScanner (#54679 #54804) (#55020)
bp:
#54679 adapt session time_zone for PaimonJniScanner
#54804 refactor PaimonSysJniScanner
---
.../format/table/iceberg_sys_table_jni_reader.cpp | 11 +-
.../format/table/iceberg_sys_table_jni_reader.h | 4 +-
be/src/vec/exec/format/table/paimon_jni_reader.cpp | 9 +-
.../format/table/paimon_sys_table_jni_reader.cpp | 23 +--
.../format/table/paimon_sys_table_jni_reader.h | 5 +-
be/src/vec/exec/scan/vmeta_scanner.cpp | 31 +--
be/src/vec/exec/scan/vmeta_scanner.h | 2 -
.../create_preinstalled_scripts/paimon/run08.sql | 15 ++
.../doris/iceberg/IcebergSysTableJniScanner.java | 32 ++-
.../org/apache/doris/paimon/PaimonColumnValue.java | 19 +-
.../org/apache/doris/paimon/PaimonJniScanner.java | 7 +-
.../doris/paimon/PaimonSysTableJniScanner.java | 184 +++++++----------
.../org/apache/doris/paimon/PaimonTableCache.java | 221 ---------------------
.../datasource/paimon/source/PaimonScanNode.java | 10 -
.../datasource/tvf/source/MetadataScanNode.java | 42 +++-
.../tablefunction/BackendsTableValuedFunction.java | 5 +-
.../tablefunction/CatalogsTableValuedFunction.java | 5 +-
.../FrontendsDisksTableValuedFunction.java | 5 +-
.../FrontendsTableValuedFunction.java | 5 +-
.../tablefunction/HudiTableValuedFunction.java | 4 +-
.../tablefunction/IcebergTableValuedFunction.java | 27 ++-
.../tablefunction/JobsTableValuedFunction.java | 5 +-
.../tablefunction/MetadataTableValuedFunction.java | 2 +-
.../tablefunction/MvInfosTableValuedFunction.java | 5 +-
.../tablefunction/PaimonTableValuedFunction.java | 41 +---
.../PartitionValuesTableValuedFunction.java | 4 +-
.../PartitionsTableValuedFunction.java | 5 +-
.../tablefunction/TasksTableValuedFunction.java | 5 +-
gensrc/thrift/PlanNodes.thrift | 12 +-
.../test_paimon_timestamp_with_time_zone.out | Bin 0 -> 483 bytes
.../test_paimon_timestamp_with_time_zone.groovy | 61 ++++++
31 files changed, 294 insertions(+), 512 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
index 2d3519b7dd2..7d35329a2bc 100644
--- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
@@ -27,8 +27,8 @@ static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
IcebergSysTableJniReader::IcebergSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state,
- RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
- : JniReader(file_slot_descs, state, profile),
_range_params(range_params) {}
+ RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+ : JniReader(file_slot_descs, state, profile),
_meta_scan_range(meta_scan_range) {}
Status IcebergSysTableJniReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
@@ -39,11 +39,12 @@ Status IcebergSysTableJniReader::init_reader(
required_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<std::string, std::string> params;
- params["serialized_task"] = _range_params.serialized_task;
+ // "," is not in base64
+ params["serialized_splits"] = join(_meta_scan_range.serialized_splits,
",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");
- params["time_zone"] = _state->timezone_obj().name();
- for (const auto& kv : _range_params.hadoop_props) {
+ params["time_zone"] = _state->timezone();
+ for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}
_jni_connector =
diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
index ed867e46abe..982f4357343 100644
--- a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
@@ -47,7 +47,7 @@ class IcebergSysTableJniReader : public JniReader {
public:
IcebergSysTableJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
- const TIcebergMetadataParams& range_params);
+ const TMetaScanRange& meta_scan_range);
~IcebergSysTableJniReader() override = default;
@@ -55,7 +55,7 @@ public:
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
- const TIcebergMetadataParams& _range_params;
+ const TMetaScanRange& _meta_scan_range;
};
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 8b10e3a54ff..b603a3ccc54 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -49,18 +49,11 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
column_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<String, String> params;
- params["db_name"] = range.table_format_params.paimon_params.db_name;
- params["table_name"] = range.table_format_params.paimon_params.table_name;
params["paimon_split"] =
range.table_format_params.paimon_params.paimon_split;
- params["paimon_column_names"] =
range.table_format_params.paimon_params.paimon_column_names;
params["paimon_predicate"] =
range.table_format_params.paimon_params.paimon_predicate;
- params["ctl_id"] =
std::to_string(range.table_format_params.paimon_params.ctl_id);
- params["db_id"] =
std::to_string(range.table_format_params.paimon_params.db_id);
- params["tbl_id"] =
std::to_string(range.table_format_params.paimon_params.tbl_id);
- params["last_update_time"] =
-
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");
+ params["time_zone"] = _state->timezone();
if (range_params->__isset.serialized_table) {
params["serialized_table"] = range_params->serialized_table;
}
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
index f63de6586cf..56daa90d0c2 100644
--- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.cpp
@@ -24,12 +24,11 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"
const std::string PaimonSysTableJniReader::HADOOP_OPTION_PREFIX = "hadoop.";
-const std::string PaimonSysTableJniReader::PAIMON_OPTION_PREFIX = "paimon.";
PaimonSysTableJniReader::PaimonSysTableJniReader(
const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state,
- RuntimeProfile* profile, const TPaimonMetadataParams& range_params)
- : JniReader(file_slot_descs, state, profile),
_range_params(range_params) {
+ RuntimeProfile* profile, const TMetaScanRange& meta_scan_range)
+ : JniReader(file_slot_descs, state, profile),
_meta_scan_range(meta_scan_range) {
std::vector<std::string> required_fields;
std::vector<std::string> required_types;
for (const auto& desc : _file_slot_descs) {
@@ -38,21 +37,13 @@ PaimonSysTableJniReader::PaimonSysTableJniReader(
}
std::map<std::string, std::string> params;
- params["db_name"] = _range_params.db_name;
- params["tbl_name"] = _range_params.tbl_name;
- params["query_type"] = _range_params.query_type;
- params["ctl_id"] = std::to_string(_range_params.ctl_id);
- params["db_id"] = std::to_string(_range_params.db_id);
- params["tbl_id"] = std::to_string(_range_params.tbl_id);
- params["serialized_split"] = _range_params.serialized_split;
+ params["serialized_table"] = _meta_scan_range.serialized_table;
+ // "," is not in base64
+ params["serialized_splits"] = join(_meta_scan_range.serialized_splits,
",");
params["required_fields"] = join(required_fields, ",");
params["required_types"] = join(required_types, "#");
-
- for (const auto& kv : _range_params.paimon_props) {
- params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
- }
-
- for (const auto& kv : _range_params.hadoop_props) {
+ params["time_zone"] = _state->timezone();
+ for (const auto& kv : _meta_scan_range.hadoop_props) {
params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
}
diff --git a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
index f7b3108f5af..a6f43899e2d 100644
--- a/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_sys_table_jni_reader.h
@@ -46,10 +46,9 @@ class PaimonSysTableJniReader : public JniReader {
public:
static const std::string HADOOP_OPTION_PREFIX;
- static const std::string PAIMON_OPTION_PREFIX;
PaimonSysTableJniReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
- const TPaimonMetadataParams& range_params);
+ const TMetaScanRange& meta_scan_range);
~PaimonSysTableJniReader() override = default;
@@ -58,7 +57,7 @@ public:
private:
const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
- const TPaimonMetadataParams& _range_params;
+ const TMetaScanRange& _meta_scan_range;
};
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index b8ae79925ac..0860d2bb979 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -67,14 +67,14 @@ Status VMetaScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::open(state));
if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
// TODO: refactor this code
- auto reader = IcebergSysTableJniReader::create_unique(
- _tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range.iceberg_params);
+ auto reader =
IcebergSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
+
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
} else if (_scan_range.meta_scan_range.metadata_type ==
TMetadataType::PAIMON) {
- auto reader = PaimonSysTableJniReader::create_unique(
- _tuple_desc->slots(), state, _profile,
_scan_range.meta_scan_range.paimon_params);
+ auto reader =
PaimonSysTableJniReader::create_unique(_tuple_desc->slots(), state, _profile,
+
_scan_range.meta_scan_range);
const std::unordered_map<std::string, ColumnValueRangeType>
colname_to_value_range;
RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
_reader = std::move(reader);
@@ -251,9 +251,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
TFetchSchemaTableDataRequest request;
switch (meta_scan_range.metadata_type) {
- case TMetadataType::ICEBERG:
- RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range,
&request));
- break;
case TMetadataType::HUDI:
RETURN_IF_ERROR(_build_hudi_metadata_request(meta_scan_range,
&request));
break;
@@ -319,26 +316,6 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
return Status::OK();
}
-Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
-
TFetchSchemaTableDataRequest* request) {
- VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
- if (!meta_scan_range.__isset.iceberg_params) {
- return Status::InternalError("Can not find TIcebergMetadataParams from
meta_scan_range.");
- }
-
- // create request
- request->__set_cluster_name("");
- request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
-
- // create TMetadataTableRequestParams
- TMetadataTableRequestParams metadata_table_params;
- metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
-
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);
-
- request->__set_metada_table_params(metadata_table_params);
- return Status::OK();
-}
-
Status VMetaScanner::_build_hudi_metadata_request(const TMetaScanRange&
meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_hudi_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 7b344faf18b..1974a713584 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -64,8 +64,6 @@ protected:
private:
Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>&
columns);
Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
- Status _build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
- TFetchSchemaTableDataRequest*
request);
Status _build_hudi_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_backends_metadata_request(const TMetaScanRange&
meta_scan_range,
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
new file mode 100644
index 00000000000..073d26548a0
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run08.sql
@@ -0,0 +1,15 @@
+use paimon;
+create database if not exists test_paimon_spark;
+use test_paimon_spark;
+
+SET TIME ZONE '+08:00';
+
+CREATE TABLE IF NOT EXISTS t_ts_ntz (
+ id INT,
+ ts TIMESTAMP,
+ ts_ntz TIMESTAMP_NTZ
+) USING paimon;
+
+INSERT INTO t_ts_ntz VALUES
+ (1, CAST('2025-08-12 06:00:00+00:00' AS TIMESTAMP), CAST('2025-08-12
06:00:00' AS TIMESTAMP_NTZ)),
+ (2, CAST('2025-08-12 14:00:00+08:00' AS TIMESTAMP), CAST('2025-08-12
14:00:00' AS TIMESTAMP_NTZ));
\ No newline at end of file
diff --git
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
index 350d0d872c3..9603ed35504 100644
---
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
+++
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.jni.vec.ColumnValue;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
+import com.google.common.base.Preconditions;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterator;
@@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -47,16 +50,21 @@ public class IcebergSysTableJniScanner extends JniScanner {
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
private final ClassLoader classLoader;
private final PreExecutionAuthenticator preExecutionAuthenticator;
- private final FileScanTask scanTask;
+ private final Iterator<FileScanTask> scanTasks;
private final List<NestedField> fields;
private final String timezone;
private CloseableIterator<StructLike> reader;
public IcebergSysTableJniScanner(int batchSize, Map<String, String>
params) {
this.classLoader = this.getClass().getClassLoader();
- this.scanTask =
SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
+ List<FileScanTask> scanTasks =
Arrays.stream(params.get("serialized_splits").split(","))
+ .map(SerializationUtil::deserializeFromBase64)
+ .map(obj -> (FileScanTask) obj)
+ .collect(Collectors.toList());
+ Preconditions.checkState(!scanTasks.isEmpty(), "scanTasks shoudle not
be empty");
+ this.scanTasks = scanTasks.iterator();
String[] requiredFields = params.get("required_fields").split(",");
- this.fields = selectSchema(scanTask.schema().asStruct(),
requiredFields);
+ this.fields = selectSchema(scanTasks.get(0).schema().asStruct(),
requiredFields);
this.timezone = params.getOrDefault("time_zone",
TimeZone.getDefault().getID());
Map<String, String> hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
@@ -69,8 +77,14 @@ public class IcebergSysTableJniScanner extends JniScanner {
@Override
public void open() throws IOException {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ nextScanTask();
+ }
+
+ private void nextScanTask() throws IOException {
+ Preconditions.checkArgument(scanTasks.hasNext());
+ FileScanTask scanTask = scanTasks.next();
try {
- Thread.currentThread().setContextClassLoader(classLoader);
preExecutionAuthenticator.execute(() -> {
// execute FileScanTask to get rows
reader = scanTask.asDataTask().rows().iterator();
@@ -78,7 +92,7 @@ public class IcebergSysTableJniScanner extends JniScanner {
});
} catch (Exception e) {
this.close();
- String msg = String.format("Failed to open
IcebergMetadataJniScanner");
+ String msg = String.format("Failed to open next scan task: %s",
scanTask);
LOG.error(msg, e);
throw new IOException(msg, e);
}
@@ -86,11 +100,11 @@ public class IcebergSysTableJniScanner extends JniScanner {
@Override
protected int getNext() throws IOException {
- if (reader == null) {
- return 0;
- }
int rows = 0;
- while (reader.hasNext() && rows < getBatchSize()) {
+ while ((reader.hasNext() || scanTasks.hasNext()) && rows <
getBatchSize()) {
+ if (!reader.hasNext()) {
+ nextScanTask();
+ }
StructLike row = reader.next();
for (int i = 0; i < fields.size(); i++) {
NestedField field = fields.get(i);
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
index 73aa6ce8550..af8a13149e1 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java
@@ -46,15 +46,17 @@ public class PaimonColumnValue implements ColumnValue {
private DataGetters record;
private ColumnType dorisType;
private DataType dataType;
+ private String timeZone;
public PaimonColumnValue() {
}
- public PaimonColumnValue(DataGetters record, int idx, ColumnType
columnType, DataType dataType) {
+ public PaimonColumnValue(DataGetters record, int idx, ColumnType
columnType, DataType dataType, String timeZone) {
this.idx = idx;
this.record = record;
this.dorisType = columnType;
this.dataType = dataType;
+ this.timeZone = timeZone;
}
public void setIdx(int idx, ColumnType dorisType, DataType dataType) {
@@ -67,6 +69,10 @@ public class PaimonColumnValue implements ColumnValue {
this.record = record;
}
+ public void setTimeZone(String timeZone) {
+ this.timeZone = timeZone;
+ }
+
@Override
public boolean canGetStringAsBytes() {
return true;
@@ -136,7 +142,8 @@ public class PaimonColumnValue implements ColumnValue {
public LocalDateTime getDateTime() {
Timestamp ts = record.getTimestamp(idx, dorisType.getPrecision());
if (dataType instanceof LocalZonedTimestampType) {
- return LocalDateTime.ofInstant(ts.toInstant(),
ZoneId.systemDefault());
+ return ts.toLocalDateTime().atZone(ZoneId.of("UTC"))
+
.withZoneSameInstant(ZoneId.of(timeZone)).toLocalDateTime();
} else {
return ts.toLocalDateTime();
}
@@ -157,7 +164,7 @@ public class PaimonColumnValue implements ColumnValue {
InternalArray recordArray = record.getArray(idx);
for (int i = 0; i < recordArray.size(); i++) {
PaimonColumnValue arrayColumnValue = new
PaimonColumnValue((DataGetters) recordArray, i,
- dorisType.getChildTypes().get(0), ((ArrayType)
dataType).getElementType());
+ dorisType.getChildTypes().get(0), ((ArrayType)
dataType).getElementType(), timeZone);
values.add(arrayColumnValue);
}
}
@@ -168,13 +175,13 @@ public class PaimonColumnValue implements ColumnValue {
InternalArray key = map.keyArray();
for (int i = 0; i < key.size(); i++) {
PaimonColumnValue keyColumnValue = new
PaimonColumnValue((DataGetters) key, i,
- dorisType.getChildTypes().get(0), ((MapType)
dataType).getKeyType());
+ dorisType.getChildTypes().get(0), ((MapType)
dataType).getKeyType(), timeZone);
keys.add(keyColumnValue);
}
InternalArray value = map.valueArray();
for (int i = 0; i < value.size(); i++) {
PaimonColumnValue valueColumnValue = new
PaimonColumnValue((DataGetters) value, i,
- dorisType.getChildTypes().get(1), ((MapType)
dataType).getValueType());
+ dorisType.getChildTypes().get(1), ((MapType)
dataType).getValueType(), timeZone);
values.add(valueColumnValue);
}
}
@@ -185,7 +192,7 @@ public class PaimonColumnValue implements ColumnValue {
InternalRow row = record.getRow(idx, structFieldIndex.size());
for (int i : structFieldIndex) {
values.add(new PaimonColumnValue(row, i,
dorisType.getChildTypes().get(i),
- ((RowType) dataType).getFields().get(i).type()));
+ ((RowType) dataType).getFields().get(i).type(), timeZone));
}
}
}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index d25d5f7b94f..4dc1fe90a33 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -39,17 +39,14 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import java.util.stream.Collectors;
public class PaimonJniScanner extends JniScanner {
private static final Logger LOG =
LoggerFactory.getLogger(PaimonJniScanner.class);
- @Deprecated
- private static final String PAIMON_OPTION_PREFIX = "paimon.";
- @Deprecated
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
private final Map<String, String> params;
- @Deprecated
private final Map<String, String> hadoopOptionParams;
private final String paimonSplit;
private final String paimonPredicate;
@@ -76,6 +73,8 @@ public class PaimonJniScanner extends JniScanner {
}
paimonSplit = params.get("paimon_split");
paimonPredicate = params.get("paimon_predicate");
+ String timeZone = params.getOrDefault("time_zone",
TimeZone.getDefault().getID());
+ columnValue.setTimeZone(timeZone);
initTableInfo(columnTypes, requiredFields, batchSize);
hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
index 0344c8b2db4..79289bb8256 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonSysTableJniScanner.java
@@ -21,9 +21,8 @@ import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
-import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
-import org.apache.doris.paimon.PaimonTableCache.TableExt;
+import com.google.common.base.Preconditions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
@@ -36,39 +35,32 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TimeZone;
import java.util.stream.Collectors;
/**
* JNI-based scanner for reading Apache Paimon system tables
+ * TODO: unify this with PaimonJniScanner in future
*/
public class PaimonSysTableJniScanner extends JniScanner {
private static final Logger LOG =
LoggerFactory.getLogger(PaimonSysTableJniScanner.class);
private static final String HADOOP_OPTION_PREFIX = "hadoop.";
- private static final String PAIMON_OPTION_PREFIX = "paimon.";
-
- private final Map<String, String> params;
- private final Map<String, String> hadoopOptionParams;
- private final Map<String, String> paimonOptionParams;
private final ClassLoader classLoader;
- private final Split paimonSplit;
- private Table table;
- private RecordReader<InternalRow> reader;
+ private final Table table;
+ private final Iterator<Split> paimonSplits;
private final PaimonColumnValue columnValue = new PaimonColumnValue();
- private List<DataType> paimonDataTypeList;
- private List<String> paimonAllFieldNames;
+ private final List<String> paimonAllFieldNames;
+ private final int[] projected;
+ private final List<DataType> paimonDataTypeList;
private final PreExecutionAuthenticator preExecutionAuthenticator;
- private RecordReader.RecordIterator<InternalRow> recordIterator = null;
- private final long ctlId;
- private final long dbId;
- private final long tblId;
- private final String dbName;
- private final String tblName;
- private final String queryType;
+ private RecordReader<InternalRow> reader;
+ private RecordReader.RecordIterator<InternalRow> recordIterator;
/**
* Constructs a new PaimonSysTableJniScanner for reading Paimon system
tables.
@@ -78,52 +70,60 @@ public class PaimonSysTableJniScanner extends JniScanner {
if (LOG.isDebugEnabled()) {
LOG.debug("params:{}", params);
}
- this.params = params;
String[] requiredFields = params.get("required_fields").split(",");
String[] requiredTypes = params.get("required_types").split("#");
ColumnType[] columnTypes = new ColumnType[requiredTypes.length];
for (int i = 0; i < requiredTypes.length; i++) {
columnTypes[i] = ColumnType.parseType(requiredFields[i],
requiredTypes[i]);
}
+ String timeZone = params.getOrDefault("time_zone",
TimeZone.getDefault().getID());
+ columnValue.setTimeZone(timeZone);
initTableInfo(columnTypes, requiredFields, batchSize);
- this.paimonSplit =
PaimonUtils.deserialize(params.get("serialized_split"));
- this.ctlId = Long.parseLong(params.get("ctl_id"));
- this.dbId = Long.parseLong(params.get("db_id"));
- this.tblId = Long.parseLong(params.get("tbl_id"));
- this.dbName = params.get("db_name");
- this.tblName = params.get("tbl_name");
- this.queryType = params.get("query_type");
- this.hadoopOptionParams = params.entrySet().stream()
+ this.table = PaimonUtils.deserialize(params.get("serialized_table"));
+ this.paimonAllFieldNames =
PaimonUtils.getFieldNames(this.table.rowType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
+ }
+ resetDatetimeV2Precision();
+ this.projected =
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
+ this.paimonDataTypeList = Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i))
+ .collect(Collectors.toList());
+ this.paimonSplits =
Arrays.stream(params.get("serialized_splits").split(","))
+ .map(PaimonUtils::deserialize).map(obj -> (Split) obj)
+ .collect(Collectors.toList()).iterator();
+ Map<String, String> hadoopOptionParams = params.entrySet().stream()
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
- .collect(Collectors
- .toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
- Entry::getValue));
- this.paimonOptionParams = params.entrySet().stream()
- .filter(kv -> kv.getKey().startsWith(PAIMON_OPTION_PREFIX))
- .collect(Collectors
- .toMap(kv1 ->
kv1.getKey().substring(PAIMON_OPTION_PREFIX.length()),
- Entry::getValue));
+ .collect(Collectors.toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()),
+ Entry::getValue));
this.preExecutionAuthenticator =
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
}
@Override
- public void open() {
+ public void open() throws IOException {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ if (!paimonSplits.hasNext()) {
+ throw new IOException("Failed to open PaimonSysTableJniScanner: No
valid splits found");
+ }
+ nextReader();
+ }
+
+ private void nextReader() throws IOException {
+ Preconditions.checkArgument(paimonSplits.hasNext(), "No more splits
available");
+ Split paimonSplit = paimonSplits.next();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ readBuilder.withProjection(projected);
try {
- // When the user does not specify hive-site.xml, Paimon will look
for the file from the classpath:
- // org.apache.paimon.hive.HiveCatalog.createHiveConf:
- //
`Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
- // so we need to provide a classloader, otherwise it will cause
NPE.
- Thread.currentThread().setContextClassLoader(classLoader);
preExecutionAuthenticator.execute(() -> {
- initTable();
- initReader();
+ reader =
readBuilder.newRead().executeFilter().createReader(paimonSplit);
+ Preconditions.checkArgument(recordIterator == null);
+ recordIterator = reader.readBatch();
return null;
});
- resetDatetimeV2Precision();
-
- } catch (Throwable e) {
- LOG.warn("Failed to open paimon_scanner: {}", e.getMessage(), e);
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ this.close();
+ String msg = String.format("Failed to open next paimonSplit: %s",
paimonSplits);
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
}
}
@@ -139,48 +139,10 @@ public class PaimonSysTableJniScanner extends JniScanner {
try {
return
preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new IOException("Failed to getNext in
PaimonSysTableJniScanner", e);
}
}
- private void initTable() {
- PaimonTableCacheKey key = new PaimonTableCacheKey(ctlId, dbId, tblId,
- paimonOptionParams, hadoopOptionParams, dbName, tblName,
queryType);
- TableExt tableExt = PaimonTableCache.getTable(key);
- Table paimonTable = tableExt.getTable();
- if (paimonTable == null) {
- throw new RuntimeException(
- String.format(
- "Failed to get Paimon system table
{%s}.{%s}${%s}. ",
- dbName, tblName, queryType));
- }
- this.table = paimonTable;
- this.paimonAllFieldNames =
PaimonUtils.getFieldNames(this.table.rowType());
- if (LOG.isDebugEnabled()) {
- LOG.debug("paimonAllFieldNames:{}", paimonAllFieldNames);
- }
- }
-
- private void initReader() throws IOException {
- ReadBuilder readBuilder = table.newReadBuilder();
- if (this.fields.length > this.paimonAllFieldNames.size()) {
- throw new IOException(
- String.format(
- "The jni reader fields' size {%s} is not matched
with paimon fields' size {%s}."
- + " Please refresh table and try again",
- fields.length, paimonAllFieldNames.size()));
- }
- int[] projected = getProjected();
- readBuilder.withProjection(projected);
- reader =
readBuilder.newRead().executeFilter().createReader(paimonSplit);
- paimonDataTypeList =
- Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i)).collect(Collectors.toList());
- }
-
- private int[] getProjected() {
- return
Arrays.stream(fields).mapToInt(paimonAllFieldNames::indexOf).toArray();
- }
-
private void resetDatetimeV2Precision() {
for (int i = 0; i < types.length; i++) {
if (types[i].isDateTimeV2()) {
@@ -199,35 +161,27 @@ public class PaimonSysTableJniScanner extends JniScanner {
private int readAndProcessNextBatch() throws IOException {
int rows = 0;
- try {
- if (recordIterator == null) {
- recordIterator = reader.readBatch();
- }
-
- while (recordIterator != null) {
- InternalRow record;
- while ((record = recordIterator.next()) != null) {
- columnValue.setOffsetRow(record);
- for (int i = 0; i < fields.length; i++) {
- columnValue.setIdx(i, types[i],
paimonDataTypeList.get(i));
- long l = System.nanoTime();
- appendData(i, columnValue);
- appendDataTime += System.nanoTime() - l;
- }
- rows++;
- if (rows >= batchSize) {
- return rows;
- }
+ while (recordIterator != null) {
+ InternalRow record;
+ while ((record = recordIterator.next()) != null) {
+ columnValue.setOffsetRow(record);
+ for (int i = 0; i < fields.length; i++) {
+ columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
+ long l = System.nanoTime();
+ appendData(i, columnValue);
+ appendDataTime += System.nanoTime() - l;
+ }
+ rows++;
+ if (rows >= batchSize) {
+ return rows;
}
- recordIterator.releaseBatch();
- recordIterator = reader.readBatch();
}
- } catch (Exception e) {
- close();
- LOG.warn("Failed to get the next batch of paimon. "
- + "split: {}, requiredFieldNames: {},
paimonAllFieldNames: {}, dataType: {}",
- paimonSplit, params.get("required_fields"),
paimonAllFieldNames, paimonDataTypeList, e);
- throw new IOException(e);
+ recordIterator.releaseBatch();
+ recordIterator = reader.readBatch();
+ if (recordIterator == null && paimonSplits.hasNext()) {
+ // try to get next reader
+ nextReader();
+ }
}
return rows;
}
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
deleted file mode 100644
index e5f067af96b..00000000000
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonTableCache.java
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.paimon;
-
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-public class PaimonTableCache {
- // Max cache num of paimon table
- public static final long max_external_schema_cache_num = 50;
- // The expiration time of a cache object after last access of it.
- public static final long external_cache_expire_time_minutes_after_access =
100;
-
- private static final Logger LOG =
LoggerFactory.getLogger(PaimonTableCache.class);
-
- private static LoadingCache<PaimonTableCacheKey, TableExt> tableCache =
CacheBuilder.newBuilder()
- .maximumSize(max_external_schema_cache_num)
-
.expireAfterAccess(external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
- .build(new CacheLoader<PaimonTableCacheKey, TableExt>() {
- @Override
- public TableExt load(PaimonTableCacheKey key) {
- return loadTable(key);
- }
- });
-
- private static TableExt loadTable(PaimonTableCacheKey key) {
- try {
- LOG.warn("load table:{}", key);
- Catalog catalog = createCatalog(key.getPaimonOptionParams(),
key.getHadoopOptionParams());
- Table table;
- if (key.getQueryType() != null) {
- table = catalog.getTable(new Identifier(key.getDbName(),
key.getTblName(),
- null, key.getQueryType()));
- } else {
- table = catalog.getTable(Identifier.create(key.getDbName(),
key.getTblName()));
- }
- return new TableExt(table, System.currentTimeMillis());
- } catch (Catalog.TableNotExistException e) {
- LOG.warn("failed to create paimon table ", e);
- throw new RuntimeException(e);
- }
- }
-
- private static Catalog createCatalog(
- Map<String, String> paimonOptionParams,
- Map<String, String> hadoopOptionParams) {
- Options options = new Options();
- paimonOptionParams.entrySet().stream().forEach(kv ->
options.set(kv.getKey(), kv.getValue()));
- Configuration hadoopConf = new Configuration();
- hadoopOptionParams.entrySet().stream().forEach(kv ->
hadoopConf.set(kv.getKey(), kv.getValue()));
- CatalogContext context = CatalogContext.create(options, hadoopConf);
- return CatalogFactory.createCatalog(context);
- }
-
- public static void invalidateTableCache(PaimonTableCacheKey key) {
- tableCache.invalidate(key);
- }
-
- public static TableExt getTable(PaimonTableCacheKey key) {
- try {
- return tableCache.get(key);
- } catch (ExecutionException e) {
- throw new RuntimeException("failed to get table for:" + key);
- }
- }
-
- public static class TableExt {
- private Table table;
- private long createTime;
-
- public TableExt(Table table, long createTime) {
- this.table = table;
- this.createTime = createTime;
- }
-
- public Table getTable() {
- return table;
- }
-
- public long getCreateTime() {
- return createTime;
- }
- }
-
- public static class PaimonTableCacheKey {
- // in key
- private final long ctlId;
- private final long dbId;
- private final long tblId;
-
- // not in key
- private Map<String, String> paimonOptionParams;
- private Map<String, String> hadoopOptionParams;
- private String dbName;
- private String tblName;
- private String queryType;
-
- public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
- Map<String, String> paimonOptionParams,
- Map<String, String> hadoopOptionParams,
- String dbName, String tblName) {
- this.ctlId = ctlId;
- this.dbId = dbId;
- this.tblId = tblId;
- this.paimonOptionParams = paimonOptionParams;
- this.hadoopOptionParams = hadoopOptionParams;
- this.dbName = dbName;
- this.tblName = tblName;
- }
-
- public PaimonTableCacheKey(long ctlId, long dbId, long tblId,
- Map<String, String> paimonOptionParams,
- Map<String, String> hadoopOptionParams,
- String dbName, String tblName, String queryType) {
- this.ctlId = ctlId;
- this.dbId = dbId;
- this.tblId = tblId;
- this.paimonOptionParams = paimonOptionParams;
- this.hadoopOptionParams = hadoopOptionParams;
- this.dbName = dbName;
- this.tblName = tblName;
- this.queryType = queryType;
- }
-
- public long getCtlId() {
- return ctlId;
- }
-
- public long getDbId() {
- return dbId;
- }
-
- public long getTblId() {
- return tblId;
- }
-
- public Map<String, String> getPaimonOptionParams() {
- return paimonOptionParams;
- }
-
- public Map<String, String> getHadoopOptionParams() {
- return hadoopOptionParams;
- }
-
- public String getDbName() {
- return dbName;
- }
-
- public String getTblName() {
- return tblName;
- }
-
- public String getQueryType() {
- return queryType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PaimonTableCacheKey that = (PaimonTableCacheKey) o;
- return ctlId == that.ctlId && dbId == that.dbId && tblId ==
that.tblId && Objects.equal(
- queryType,
- that.queryType);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(ctlId, dbId, tblId);
- }
-
- @Override
- public String toString() {
- return "PaimonTableCacheKey{"
- + "ctlId=" + ctlId
- + ", dbId=" + dbId
- + ", tblId=" + tblId
- + ", paimonOptionParams=" + paimonOptionParams
- + ", hadoopOptionParams=" + hadoopOptionParams
- + ", dbName='" + dbName + '\''
- + ", tblName='" + tblName + '\''
- + ", queryType='" + queryType + '\''
- + '}';
- }
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 1751d1733e8..3bfe7877c9b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -87,7 +87,6 @@ public class PaimonScanNode extends FileQueryScanNode {
private static final String DORIS_START_TIMESTAMP = "startTimestamp";
private static final String DORIS_END_TIMESTAMP = "endTimestamp";
private static final String DORIS_INCREMENTAL_BETWEEN_SCAN_MODE =
"incrementalBetweenScanMode";
- private static final String DEFAULT_INCREMENTAL_BETWEEN_SCAN_MODE = "auto";
private enum SplitReadType {
JNI,
@@ -217,15 +216,6 @@ public class PaimonScanNode extends FileQueryScanNode {
}
fileDesc.setFileFormat(fileFormat);
fileDesc.setPaimonPredicate(PaimonUtil.encodeObjectToString(predicates));
-
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
- .collect(Collectors.joining(",")));
- fileDesc.setDbName(((PaimonExternalTable)
source.getTargetTable()).getDbName());
- fileDesc.setPaimonOptions(((PaimonExternalCatalog)
source.getCatalog()).getPaimonOptionsMap());
- fileDesc.setTableName(source.getTargetTable().getName());
- fileDesc.setCtlId(source.getCatalog().getId());
- fileDesc.setDbId(((PaimonExternalTable)
source.getTargetTable()).getDbId());
- fileDesc.setTblId(source.getTargetTable().getId());
- fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
// The hadoop conf should be same with
// PaimonExternalCatalog.createCatalog()#getConfiguration()
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getBackendStorageProperties());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index ac6281112e5..5073dd13d53 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -70,24 +70,58 @@ public class MetadataScanNode extends ExternalScanNode {
@Override
protected void createScanRangeLocations() {
- List<String> requiredFileds = desc.getSlots().stream()
+ List<String> requiredFields = desc.getSlots().stream()
.filter(slot -> slot.isMaterialized())
.map(slot -> slot.getColumn().getName())
.collect(java.util.stream.Collectors.toList());
- for (TMetaScanRange metaScanRange :
tvf.getMetaScanRanges(requiredFileds)) {
- TScanRange scanRange = new TScanRange();
- scanRange.setMetaScanRange(metaScanRange);
+ TMetaScanRange metaScanRange = tvf.getMetaScanRange(requiredFields);
+ if (!metaScanRange.isSetSerializedSplits()) {
+ // no need to split ranges to send to backends
TScanRangeLocation location = new TScanRangeLocation();
Backend backend = backendPolicy.getNextBe();
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
+ TScanRange scanRange = new TScanRange();
+ scanRange.setMetaScanRange(metaScanRange);
+
TScanRangeLocations locations = new TScanRangeLocations();
locations.addToLocations(location);
locations.setScanRange(scanRange);
scanRangeLocations.add(locations);
+ } else {
+ // need to split ranges to send to backends
+ List<Backend> backends =
Lists.newArrayList(backendPolicy.getBackends());
+ List<String> splits = metaScanRange.getSerializedSplits();
+ int numSplitsPerBE = Math.max(1, splits.size() / backends.size());
+
+ for (int i = 0; i < backends.size(); i++) {
+ int from = i * numSplitsPerBE;
+ if (from >= splits.size()) {
+ continue; // no splits for this backend
+ }
+ int to = Math.min((i + 1) * numSplitsPerBE, splits.size());
+
+ // set splited task to TMetaScanRange
+ TMetaScanRange subRange = metaScanRange.deepCopy();
+ subRange.setSerializedSplits(splits.subList(from, to));
+
+ TScanRangeLocation location = new TScanRangeLocation();
+ Backend backend = backends.get(i);
+ location.setBackendId(backend.getId());
+ location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
+
+ TScanRange scanRange = new TScanRange();
+ scanRange.setMetaScanRange(subRange);
+
+ TScanRangeLocations locations = new TScanRangeLocations();
+ locations.addToLocations(location);
+ locations.setScanRange(scanRange);
+
+ scanRangeLocations.add(locations);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
index 826e1e7c59d..aca6779fc64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
@@ -33,7 +33,6 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -104,13 +103,13 @@ public class BackendsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.BACKENDS);
TBackendsMetadataParams backendsMetadataParams = new
TBackendsMetadataParams();
backendsMetadataParams.setClusterName("");
metaScanRange.setBackendsParams(backendsMetadataParams);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
index 75df6bb6221..35719f3fa52 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
@@ -26,7 +26,6 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -79,9 +78,9 @@ public class CatalogsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.CATALOGS);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
index d1b176f35a1..31ddf55313a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
@@ -32,7 +32,6 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -88,13 +87,13 @@ public class FrontendsDisksTableValuedFunction extends
MetadataTableValuedFuncti
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS);
TFrontendsMetadataParams frontendsMetadataParams = new
TFrontendsMetadataParams();
frontendsMetadataParams.setClusterName("");
metaScanRange.setFrontendsParams(frontendsMetadataParams);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
index 5acf44f4d3b..efb90070e65 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
@@ -32,7 +32,6 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
@@ -97,13 +96,13 @@ public class FrontendsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.FRONTENDS);
TFrontendsMetadataParams frontendsMetadataParams = new
TFrontendsMetadataParams();
frontendsMetadataParams.setClusterName("");
metaScanRange.setFrontendsParams(frontendsMetadataParams);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index 70e1ec84928..2125d420fa5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -120,7 +120,7 @@ public class HudiTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.HUDI);
// set hudi metadata params
@@ -130,7 +130,7 @@ public class HudiTableValuedFunction extends
MetadataTableValuedFunction {
hudiMetadataParams.setDatabase(hudiTableName.getDb());
hudiMetadataParams.setTable(hudiTableName.getTbl());
metaScanRange.setHudiParams(hudiMetadataParams);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index d9ccc392084..359ad2ce57d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -31,12 +31,10 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.TIcebergMetadataParams;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.FileScanTask;
@@ -48,6 +46,8 @@ import org.apache.iceberg.util.SerializationUtil;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
/**
* The class of table valued function for iceberg metadata.
@@ -135,8 +135,7 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
- List<TMetaScanRange> scanRanges = Lists.newArrayList();
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
CloseableIterable<FileScanTask> tasks;
try {
tasks = preExecutionAuthenticator.execute(() -> {
@@ -145,17 +144,15 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
} catch (Exception e) {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
}
- for (FileScanTask task : tasks) {
- TMetaScanRange metaScanRange = new TMetaScanRange();
- metaScanRange.setMetadataType(TMetadataType.ICEBERG);
- // set iceberg metadata params
- TIcebergMetadataParams icebergMetadataParams = new
TIcebergMetadataParams();
- icebergMetadataParams.setHadoopProps(hadoopProps);
-
icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task));
- metaScanRange.setIcebergParams(icebergMetadataParams);
- scanRanges.add(metaScanRange);
- }
- return scanRanges;
+
+ TMetaScanRange tMetaScanRange = new TMetaScanRange();
+ tMetaScanRange.setMetadataType(TMetadataType.ICEBERG);
+ tMetaScanRange.setHadoopProps(hadoopProps);
+
tMetaScanRange.setSerializedTable(SerializationUtil.serializeToBase64(sysTable));
+
tMetaScanRange.setSerializedSplits(StreamSupport.stream(tasks.spliterator(),
false)
+ .map(SerializationUtil::serializeToBase64)
+ .collect(Collectors.toList()));
+ return tMetaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index b5d0489d30c..d9c5ec5ed2b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
@@ -99,14 +98,14 @@ public class JobsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.JOBS);
TJobsMetadataParams jobParam = new TJobsMetadataParams();
jobParam.setType(jobType.name());
jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setJobsParams(jobParam);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 92b30b0347b..32be139657c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -60,7 +60,7 @@ public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf
public abstract TMetadataType getMetadataType();
- public abstract List<TMetaScanRange> getMetaScanRanges(List<String>
requiredFileds);
+ public abstract TMetaScanRange getMetaScanRange(List<String>
requiredFileds);
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index c40a8d4716d..e0c42165530 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -29,7 +29,6 @@ import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -106,7 +105,7 @@ public class MvInfosTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -119,7 +118,7 @@ public class MvInfosTableValuedFunction extends
MetadataTableValuedFunction {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() end");
}
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
index 6d5e1fba09d..5fdd3b1846b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PaimonTableValuedFunction.java
@@ -34,7 +34,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
-import org.apache.doris.thrift.TPaimonMetadataParams;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -60,12 +59,7 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
private final Table paimonSysTable;
private final List<Column> schema;
private final Map<String, String> hadoopProps;
- private final Map<String, String> paimonProps;
private final ExecutionAuthenticator hadoopAuthenticator;
- private final TableName paimonTableName;
- private final long ctlId;
- private final long dbId;
- private final long tblId;
/**
* Creates a new Paimon table-valued function instance.
@@ -84,18 +78,14 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
throw new AnalysisException("Catalog " + paimonTableName.getCtl()
+ " is not an paimon catalog");
}
- this.paimonTableName = paimonTableName;
PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog)
dorisCatalog;
this.hadoopProps =
paimonExternalCatalog.getCatalogProperty().getHadoopProperties();
- this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap();
this.hadoopAuthenticator =
paimonExternalCatalog.getExecutionAuthenticator();
- this.ctlId = paimonExternalCatalog.getId();
ExternalDatabase<? extends ExternalTable> database =
paimonExternalCatalog.getDb(paimonTableName.getDb())
.orElseThrow(() -> new AnalysisException(
String.format("Paimon catalog database '%s' does not
exist", paimonTableName.getDb())
));
- this.dbId = database.getId();
ExternalTable externalTable =
database.getTable(paimonTableName.getTbl())
.orElseThrow(() -> new AnalysisException(
@@ -103,7 +93,6 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
paimonTableName.getDb(),
paimonTableName.getTbl())
));
NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
- this.tblId = externalTable.getId();
this.paimonSysTable =
paimonExternalCatalog.getPaimonTable(buildNameMapping,
"main", queryType);
@@ -148,7 +137,7 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
int[] projections = requiredFileds.stream().mapToInt(
field -> paimonSysTable.rowType().getFieldNames()
.stream()
@@ -157,7 +146,6 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
.indexOf(field))
.toArray();
List<Split> splits;
-
try {
splits = hadoopAuthenticator.execute(
() ->
paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits());
@@ -165,7 +153,13 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
}
- return
splits.stream().map(this::createMetaScanRange).collect(Collectors.toList());
+ TMetaScanRange tMetaScanRange = new TMetaScanRange();
+ tMetaScanRange.setMetadataType(TMetadataType.PAIMON);
+ tMetaScanRange.setHadoopProps(hadoopProps);
+
tMetaScanRange.setSerializedTable(PaimonUtil.encodeObjectToString(paimonSysTable));
+ tMetaScanRange.setSerializedSplits(
+
splits.stream().map(PaimonUtil::encodeObjectToString).collect(Collectors.toList()));
+ return tMetaScanRange;
}
@Override
@@ -177,23 +171,4 @@ public class PaimonTableValuedFunction extends
MetadataTableValuedFunction {
public List<Column> getTableColumns() throws AnalysisException {
return schema;
}
-
- private TMetaScanRange createMetaScanRange(Split split) {
- TMetaScanRange tMetaScanRange = new TMetaScanRange();
- tMetaScanRange.setMetadataType(TMetadataType.PAIMON);
-
- TPaimonMetadataParams tPaimonMetadataParams = new
TPaimonMetadataParams();
- tPaimonMetadataParams.setCtlId(ctlId);
- tPaimonMetadataParams.setDbId(dbId);
- tPaimonMetadataParams.setTblId(tblId);
- tPaimonMetadataParams.setQueryType(queryType);
- tPaimonMetadataParams.setDbName(paimonTableName.getDb());
- tPaimonMetadataParams.setTblName(paimonTableName.getTbl());
- tPaimonMetadataParams.setHadoopProps(hadoopProps);
- tPaimonMetadataParams.setPaimonProps(paimonProps);
-
tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split));
-
- tMetaScanRange.setPaimonParams(tPaimonMetadataParams);
- return tMetaScanRange;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
index e0a0d4dc649..494a68edf3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
@@ -145,7 +145,7 @@ public class PartitionValuesTableValuedFunction extends
MetadataTableValuedFunct
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -156,7 +156,7 @@ public class PartitionValuesTableValuedFunction extends
MetadataTableValuedFunct
partitionParam.setDatabase(databaseName);
partitionParam.setTable(tableName);
metaScanRange.setPartitionValuesParams(partitionParam);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
index 3ffb77cdbc6..53f99c549a3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
@@ -43,7 +43,6 @@ import org.apache.doris.thrift.TPartitionsMetadataParams;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -211,7 +210,7 @@ public class PartitionsTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() start");
}
@@ -225,7 +224,7 @@ public class PartitionsTableValuedFunction extends
MetadataTableValuedFunction {
if (LOG.isDebugEnabled()) {
LOG.debug("getMetaScanRange() end");
}
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index 5d60adbeacf..b343a7dbeed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -31,7 +31,6 @@ import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TTasksMetadataParams;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
@@ -99,14 +98,14 @@ public class TasksTableValuedFunction extends
MetadataTableValuedFunction {
}
@Override
- public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds)
{
+ public TMetaScanRange getMetaScanRange(List<String> requiredFileds) {
TMetaScanRange metaScanRange = new TMetaScanRange();
metaScanRange.setMetadataType(TMetadataType.TASKS);
TTasksMetadataParams taskParam = new TTasksMetadataParams();
taskParam.setType(jobType.name());
taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
metaScanRange.setTasksParams(taskParam);
- return Lists.newArrayList(metaScanRange);
+ return metaScanRange;
}
@Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 12b0f412561..adc5be9cd88 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -549,12 +549,13 @@ struct TDataGenScanRange {
}
+// deprecated
struct TIcebergMetadataParams {
1: optional string serialized_task
2: optional map<string, string> hadoop_props
}
-
+// deprecated
struct TPaimonMetadataParams {
1: optional string db_name
2: optional string tbl_name
@@ -624,7 +625,7 @@ struct TMetaCacheStatsParams {
struct TMetaScanRange {
1: optional Types.TMetadataType metadata_type
- 2: optional TIcebergMetadataParams iceberg_params
+ 2: optional TIcebergMetadataParams iceberg_params // deprecated
3: optional TBackendsMetadataParams backends_params
4: optional TFrontendsMetadataParams frontends_params
5: optional TQueriesMetadataParams queries_params
@@ -635,7 +636,12 @@ struct TMetaScanRange {
10: optional TMetaCacheStatsParams meta_cache_stats_params
11: optional TPartitionValuesMetadataParams partition_values_params
12: optional THudiMetadataParams hudi_params
- 13: optional TPaimonMetadataParams paimon_params
+ 13: optional TPaimonMetadataParams paimon_params // deprecated
+
+ // for quering sys tables for Paimon/Iceberg
+ 14: optional map<string, string> hadoop_props
+ 15: optional string serialized_table;
+ 16: optional list<string> serialized_splits;
}
// Specification of an individual data range which is held in its entirety
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
new file mode 100644
index 00000000000..6c4acf47ef8
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.out
differ
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
new file mode 100644
index 00000000000..68a9a06522c
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_timestamp_with_time_zone.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_timestamp_with_time_zone",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String catalog_name = "test_paimon_timestamp_with_time_zone"
+ String db_name = "test_paimon_spark"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+
+ sql """drop catalog if exists ${catalog_name}"""
+
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+ sql """use `${catalog_name}`.`${db_name}`;"""
+
+ def test_select_timestamp = {
+ qt_select_timestamp """ select * from t_ts_ntz order by id; """
+ }
+
+ try {
+ sql """ set time_zone = 'Asia/Shanghai'; """
+ sql """ set force_jni_scanner = true; """
+ test_select_timestamp()
+ sql """ set force_jni_scanner = false; """
+ test_select_timestamp()
+ sql """ set time_zone = '+10:00'; """
+ sql """ set force_jni_scanner = true; """
+ test_select_timestamp()
+ sql """ set force_jni_scanner = false; """
+ test_select_timestamp()
+ } finally {
+ sql """ unset variable time_zone; """
+ sql """ set force_jni_scanner = false; """
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]