This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 7e145bbc3e9 branch-3.1: [fix](hudi)fix hudi get current schema for not
partitioned table. #51152 (#52623)
7e145bbc3e9 is described below
commit 7e145bbc3e98084c3315679b3bc0ee3b6c62043c
Author: daidai <[email protected]>
AuthorDate: Wed Jul 2 18:05:23 2025 +0800
branch-3.1: [fix](hudi)fix hudi get current schema for not partitioned
table. #51152 (#52623)
bp #51152
---
.../doris/datasource/hive/HMSExternalTable.java | 3 +--
.../apache/doris/datasource/hive/HudiDlaTable.java | 15 +++++++++-----
.../doris/datasource/hudi/HudiMvccSnapshot.java | 8 ++++++-
.../apache/doris/datasource/hudi/HudiUtils.java | 23 +++++++++++++++++++++
.../hudi/source/HudiCachedPartitionProcessor.java | 4 ----
.../doris/datasource/hudi/source/HudiScanNode.java | 18 ++++++++++++++--
.../hudi/test_hudi_schema_change.out | Bin 10726 -> 10684 bytes
.../hudi/test_hudi_schema_change.groovy | 2 +-
8 files changed, 58 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index fde8ddf3b4d..42ded3a0a0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -36,7 +36,6 @@ import
org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
-import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
@@ -1049,7 +1048,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() == DLAType.HUDI) {
- return new
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+ return HudiUtils.getHudiMvccSnapshot(tableSnapshot, this);
} else if (getDlaType() == DLAType.ICEBERG) {
return new IcebergMvccSnapshot(
IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot,
getCatalog(), getDbName(), getName()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
index 24963ec10c5..2c56353c4e7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
@@ -97,11 +97,6 @@ public class HudiDlaTable extends HMSDlaTable {
return true;
}
- public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot>
snapshot) {
- TablePartitionValues snapshotCacheValue =
getOrFetchHudiSnapshotCacheValue(snapshot);
- return
getHudiSchemaCacheValue(snapshotCacheValue.getLastUpdateTimestamp());
- }
-
private TablePartitionValues
getOrFetchHudiSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
if (snapshot.isPresent()) {
return ((HudiMvccSnapshot)
snapshot.get()).getTablePartitionValues();
@@ -110,6 +105,16 @@ public class HudiDlaTable extends HMSDlaTable {
}
}
+ public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot>
snapshot) {
+ long timestamp = 0L;
+ if (snapshot.isPresent()) {
+ timestamp = ((HudiMvccSnapshot) snapshot.get()).getTimestamp();
+ } else {
+ timestamp = HudiUtils.getLastTimeStamp(hmsTable);
+ }
+ return getHudiSchemaCacheValue(timestamp);
+ }
+
private HMSSchemaCacheValue getHudiSchemaCacheValue(long timestamp) {
ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog());
Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
index 0f01291e54c..8821d113a7f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
@@ -27,6 +27,7 @@ import org.apache.doris.datasource.mvcc.MvccSnapshot;
*/
public class HudiMvccSnapshot implements MvccSnapshot {
private final TablePartitionValues tablePartitionValues;
+ private final long timestamp;
/**
* Creates a new HudiMvccSnapshot with the specified partition values.
@@ -34,13 +35,18 @@ public class HudiMvccSnapshot implements MvccSnapshot {
* @param tablePartitionValues The partition values for the snapshot
* @throws IllegalArgumentException if tablePartitionValues is null
*/
- public HudiMvccSnapshot(TablePartitionValues tablePartitionValues) {
+ public HudiMvccSnapshot(TablePartitionValues tablePartitionValues, Long
timeStamp) {
if (tablePartitionValues == null) {
throw new IllegalArgumentException("TablePartitionValues cannot be
null");
}
+ this.timestamp = timeStamp;
this.tablePartitionValues = tablePartitionValues;
}
+ public long getTimestamp() {
+ return timestamp;
+ }
+
/**
* Gets the table partition values associated with this snapshot.
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index 376aaaa79fe..1960213ec3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -255,6 +255,29 @@ public class HudiUtils {
return Type.UNSUPPORTED;
}
+ public static HudiMvccSnapshot getHudiMvccSnapshot(Optional<TableSnapshot>
tableSnapshot,
+ HMSExternalTable hmsTable) {
+ long timestamp = 0L;
+ if (tableSnapshot.isPresent()) {
+ String queryInstant =
tableSnapshot.get().getTime().replaceAll("[-: ]", "");
+ timestamp = Long.parseLong(queryInstant);
+ } else {
+ timestamp = getLastTimeStamp(hmsTable);
+ }
+
+ return new
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, hmsTable),
timestamp);
+ }
+
+ public static long getLastTimeStamp(HMSExternalTable hmsTable) {
+ HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
+ HoodieTimeline timeline =
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+ Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
+ if (!snapshotInstant.isPresent()) {
+ return 0L;
+ }
+ return Long.parseLong(snapshotInstant.get().getTimestamp());
+ }
+
public static TablePartitionValues
getPartitionValues(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
TablePartitionValues partitionValues = new TablePartitionValues();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index ef921cbfa47..f302458f8ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -143,10 +143,6 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
partitionValues.writeLock().lock();
try {
- long lastUpdateTimestamp =
partitionValues.getLastUpdateTimestamp();
- if (lastTimestamp <= lastUpdateTimestamp) {
- return partitionValues;
- }
HMSExternalCatalog catalog = (HMSExternalCatalog)
table.getCatalog();
List<String> partitionNames;
if (useHiveSyncPartition) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 01588e62591..f1747fd2632 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -55,11 +55,13 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.storage.StoragePath;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -274,8 +276,20 @@ public class HudiScanNode extends HiveScanNode {
.getCommitInstantInternalSchema(hudiClient,
commitInstantTime);
params.history_schema_info.computeIfAbsent(
internalSchema.schemaId(),
- k -> HudiUtils.getSchemaInfo(internalSchema));
- fileDesc.setSchemaId(internalSchema.schemaId()); //for schema
change. (native reader)
+ k -> HudiUtils.getSchemaInfo(internalSchema)); //for
schema change. (native reader)
+ fileDesc.setSchemaId(internalSchema.schemaId());
+ } else {
+ try {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(hudiClient);
+ InternalSchema internalSchema =
+
AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema(true));
+ params.history_schema_info.computeIfAbsent(
+ internalSchema.schemaId(),
+ k -> HudiUtils.getSchemaInfo(internalSchema)); //
Handle column name case for BE.
+ fileDesc.setSchemaId(internalSchema.schemaId());
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get hudi table
schema.", e);
+ }
}
}
tableFormatFileDesc.setHudiParams(fileDesc);
diff --git
a/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out
b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out
index 13c9f553564..50ae6ba756a 100644
Binary files
a/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out and
b/regression-test/data/external_table_p2/hudi/test_hudi_schema_change.out differ
diff --git
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy
index 648a4079a6e..3b9b7a2dde9 100644
---
a/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy
+++
b/regression-test/suites/external_table_p2/hudi/test_hudi_schema_change.groovy
@@ -63,7 +63,7 @@ suite("test_hudi_schema_change",
"p2,external,hudi,external_remote,external_remo
}
- qt_orc_time_travel """ select * from hudi_sc_orc_cow FOR TIME AS OF
"20250314162817433_0_0" order by id; """ //1-8
+ qt_orc_time_travel """ select * from hudi_sc_orc_cow FOR TIME AS OF
"20250314162817433" order by id; """ //1-8
qt_parquet_time_travel """ select * from hudi_sc_parquet_cow FOR TIME
AS OF "20250314163425482" order by id; """//1-6
qt_parquet_inc_1 """ SELECT * from
hudi_sc_parquet_cow@incr('beginTime'='20250314163421827') order by id; """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]