This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d3177ba67d8 branh-3.1: Revert "branch-3.1: [opt](hive) Speed up Hive
insert on partition tables using cache #58166 #58606 #58748 (#58886)" (#59348)
d3177ba67d8 is described below
commit d3177ba67d86c29b136d11de39c2f951707e72f3
Author: zy-kkk <[email protected]>
AuthorDate: Thu Dec 25 16:19:44 2025 +0800
branh-3.1: Revert "branch-3.1: [opt](hive) Speed up Hive insert on
partition tables using cache #58166 #58606 #58748 (#58886)" (#59348)
revert #58932
---
.../org/apache/doris/catalog/RefreshManager.java | 21 +----
.../doris/common/profile/SummaryProfile.java | 17 ----
.../apache/doris/datasource/ExternalObjectLog.java | 20 -----
.../doris/datasource/hive/HMSTransaction.java | 98 +++++-----------------
.../doris/datasource/hive/HiveMetaStoreCache.java | 62 --------------
.../insert/BaseExternalTableInsertExecutor.java | 23 ++---
.../plans/commands/insert/HiveInsertExecutor.java | 50 -----------
.../org/apache/doris/planner/HiveTableSink.java | 39 ++-------
.../apache/doris/planner/HiveTableSinkTest.java | 9 --
.../hive/test_hive_partitions.groovy | 97 ---------------------
10 files changed, 36 insertions(+), 400 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 2802055a887..37433ef5f6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -26,9 +26,7 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.persist.OperationType;
import com.google.common.base.Strings;
@@ -185,24 +183,7 @@ public class RefreshManager {
db.get().unregisterTable(log.getTableName());
db.get().resetMetaCacheNames();
} else {
- List<String> modifiedPartNames = log.getPartitionNames();
- List<String> newPartNames = log.getNewPartitionNames();
- if (catalog instanceof HMSExternalCatalog
- && ((modifiedPartNames != null &&
!modifiedPartNames.isEmpty())
- || (newPartNames != null && !newPartNames.isEmpty()))) {
- // Partition-level cache invalidation, only for hive catalog
- HiveMetaStoreCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog) catalog);
- cache.refreshAffectedPartitionsCache((HMSExternalTable)
table.get(), modifiedPartNames, newPartNames);
- LOG.info("replay refresh partitions for table {}, "
- + "modified partitions count: {}, "
- + "new partitions count: {}",
- table.get().getName(), modifiedPartNames == null ? 0 :
modifiedPartNames.size(),
- newPartNames == null ? 0 : newPartNames.size());
- } else {
- // Full table cache invalidation
- refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
- }
+ refreshTableInternal(db.get(), table.get(),
log.getLastUpdateTime());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 87516a19a77..1410f961881 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -83,7 +83,6 @@ public class SummaryProfile {
public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
public static final String GET_PARTITION_FILES_TIME = "Get Partition Files
Time";
public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range
Time";
- public static final String SINK_SET_PARTITION_VALUES_TIME = "Sink Set
Partition Values Time";
public static final String PLAN_TIME = "Plan Time";
public static final String SCHEDULE_TIME = "Schedule Time";
public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
@@ -159,7 +158,6 @@ public class SummaryProfile {
GET_SPLITS_TIME,
GET_PARTITIONS_TIME,
GET_PARTITION_FILES_TIME,
- SINK_SET_PARTITION_VALUES_TIME,
CREATE_SCAN_RANGE_TIME,
DISTRIBUTE_TIME,
GET_META_VERSION_TIME,
@@ -210,7 +208,6 @@ public class SummaryProfile {
.put(NEREIDS_BE_FOLD_CONST_TIME, 2)
.put(GET_PARTITIONS_TIME, 3)
.put(GET_PARTITION_FILES_TIME, 3)
- .put(SINK_SET_PARTITION_VALUES_TIME, 3)
.put(CREATE_SCAN_RANGE_TIME, 2)
.put(GET_PARTITION_VERSION_TIME, 1)
.put(GET_PARTITION_VERSION_COUNT, 1)
@@ -280,10 +277,6 @@ public class SummaryProfile {
private long getPartitionsFinishTime = -1;
@SerializedName(value = "getPartitionFilesFinishTime")
private long getPartitionFilesFinishTime = -1;
- @SerializedName(value = "sinkSetPartitionValuesStartTime")
- private long sinkSetPartitionValuesStartTime = -1;
- @SerializedName(value = "sinkSetPartitionValuesFinishTime")
- private long sinkSetPartitionValuesFinishTime = -1;
@SerializedName(value = "getSplitsFinishTime")
private long getSplitsFinishTime = -1;
@SerializedName(value = "createScanRangeFinishTime")
@@ -470,8 +463,6 @@ public class SummaryProfile {
getPrettyTime(getPartitionsFinishTime, getSplitsStartTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
getPrettyTime(getPartitionFilesFinishTime,
getPartitionsFinishTime, TUnit.TIME_MS));
- executionSummaryProfile.addInfoString(SINK_SET_PARTITION_VALUES_TIME,
- getPrettyTime(sinkSetPartitionValuesFinishTime,
sinkSetPartitionValuesStartTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime,
TUnit.TIME_MS));
executionSummaryProfile.addInfoString(SCHEDULE_TIME,
@@ -610,14 +601,6 @@ public class SummaryProfile {
this.getPartitionsFinishTime = TimeUtils.getStartTimeMs();
}
- public void setSinkGetPartitionsStartTime() {
- this.sinkSetPartitionValuesStartTime = TimeUtils.getStartTimeMs();
- }
-
- public void setSinkGetPartitionsFinishTime() {
- this.sinkSetPartitionValuesFinishTime = TimeUtils.getStartTimeMs();
- }
-
public void setGetPartitionFilesFinishTime() {
this.getPartitionFilesFinishTime = TimeUtils.getStartTimeMs();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index 43a0675d783..cf7872f8b39 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -58,9 +58,6 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "partitionNames")
private List<String> partitionNames;
- @SerializedName(value = "newPartitionNames")
- private List<String> newPartitionNames;
-
@SerializedName(value = "lastUpdateTime")
private long lastUpdateTime;
@@ -83,17 +80,6 @@ public class ExternalObjectLog implements Writable {
return externalObjectLog;
}
- public static ExternalObjectLog createForRefreshPartitions(long catalogId,
String dbName, String tblName,
- List<String> modifiedPartNames, List<String> newPartNames) {
- ExternalObjectLog externalObjectLog = new ExternalObjectLog();
- externalObjectLog.setCatalogId(catalogId);
- externalObjectLog.setDbName(dbName);
- externalObjectLog.setTableName(tblName);
- externalObjectLog.setPartitionNames(modifiedPartNames);
- externalObjectLog.setNewPartitionNames(newPartNames);
- return externalObjectLog;
- }
-
public static ExternalObjectLog createForRenameTable(long catalogId,
String dbName, String tblName,
String newTblName) {
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
@@ -138,12 +124,6 @@ public class ExternalObjectLog implements Writable {
} else {
sb.append("tableId: " + tableId + "]");
}
- if (partitionNames != null && !partitionNames.isEmpty()) {
- sb.append(", partitionNames: " + partitionNames);
- }
- if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
- sb.append(", newPartitionNames: " + newPartitionNames);
- }
return sb.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 182e214b00c..2864f146f03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -47,7 +47,6 @@ import org.apache.doris.transaction.Transaction;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -269,48 +268,30 @@ public class HMSTransaction implements Transaction {
insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
break;
case NEW:
- // Check if partition really exists in HMS (may be
cache miss in Doris)
- String partitionName = pu.getName();
- if (Strings.isNullOrEmpty(partitionName)) {
- // This should not happen for partitioned tables
- LOG.warn("Partition name is null/empty for NEW
mode in partitioned table, skipping");
- break;
- }
- List<String> partitionValues =
HiveUtil.toPartitionValues(partitionName);
- boolean existsInHMS = false;
- try {
- Partition hmsPartition =
hiveOps.getClient().getPartition(
- nameMapping.getRemoteDbName(),
- nameMapping.getRemoteTblName(),
- partitionValues);
- existsInHMS = (hmsPartition != null);
- } catch (Exception e) {
- // Partition not found in HMS, treat as truly new
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partition {} not found in HMS, will
create it", pu.getName());
- }
- }
-
- if (existsInHMS) {
- // Partition exists in HMS but not in Doris cache
- // Treat as APPEND instead of NEW to avoid
creation error
- LOG.info("Partition {} already exists in HMS
(Doris cache miss), treating as APPEND",
- pu.getName());
- insertExistsPartitions.add(Pair.of(pu,
hivePartitionStatistics));
- } else {
- // Truly new partition, create it
- createAndAddPartition(nameMapping, table,
partitionValues, writePath,
- pu, hivePartitionStatistics, false);
- }
- break;
case OVERWRITE:
- String overwritePartitionName = pu.getName();
- if (Strings.isNullOrEmpty(overwritePartitionName)) {
- LOG.warn("Partition name is null/empty for
OVERWRITE mode in partitioned table, skipping");
- break;
+ StorageDescriptor sd = table.getSd();
+ // For object storage (FILE_S3), use writePath to keep
original scheme (oss://, cos://)
+ // For HDFS, use targetPath which is the final path
after rename
+ String pathForHMS = this.fileType == TFileType.FILE_S3
+ ? writePath
+ : pu.getLocation().getTargetPath();
+ HivePartition hivePartition = new HivePartition(
+ nameMapping,
+ false,
+ sd.getInputFormat(),
+ pathForHMS,
+ HiveUtil.toPartitionValues(pu.getName()),
+ Maps.newHashMap(),
+ sd.getOutputFormat(),
+ sd.getSerdeInfo().getSerializationLib(),
+ sd.getCols()
+ );
+ if (updateMode == TUpdateMode.OVERWRITE) {
+ dropPartition(nameMapping,
hivePartition.getPartitionValues(), true);
}
- createAndAddPartition(nameMapping, table,
HiveUtil.toPartitionValues(overwritePartitionName),
- writePath, pu, hivePartitionStatistics, true);
+ addPartition(
+ nameMapping, hivePartition, writePath,
+ pu.getName(), pu.getFileNames(),
hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" +
updateMode + "] in partitioned table");
@@ -396,10 +377,6 @@ public class HMSTransaction implements Transaction {
return
hivePartitionUpdates.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
}
- public List<THivePartitionUpdate> getHivePartitionUpdates() {
- return hivePartitionUpdates;
- }
-
private void convertToInsertExistingPartitionAction(
NameMapping nameMapping,
List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
partitions) {
@@ -1051,37 +1028,6 @@ public class HMSTransaction implements Transaction {
}
}
- private void createAndAddPartition(
- NameMapping nameMapping,
- Table table,
- List<String> partitionValues,
- String writePath,
- THivePartitionUpdate pu,
- HivePartitionStatistics hivePartitionStatistics,
- boolean dropFirst) {
- StorageDescriptor sd = table.getSd();
- String pathForHMS = this.fileType == TFileType.FILE_S3
- ? writePath
- : pu.getLocation().getTargetPath();
- HivePartition hivePartition = new HivePartition(
- nameMapping,
- false,
- sd.getInputFormat(),
- pathForHMS,
- partitionValues,
- Maps.newHashMap(),
- sd.getOutputFormat(),
- sd.getSerdeInfo().getSerializationLib(),
- sd.getCols()
- );
- if (dropFirst) {
- dropPartition(nameMapping, hivePartition.getPartitionValues(),
true);
- }
- addPartition(
- nameMapping, hivePartition, writePath,
- pu.getName(), pu.getFileNames(), hivePartitionStatistics, pu);
- }
-
public synchronized void addPartition(
NameMapping nameMapping,
HivePartition partition,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index d1d6f6fc563..7180551f64a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -60,7 +60,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Iterables;
@@ -562,67 +561,6 @@ public class HiveMetaStoreCache {
}
}
- /**
- * Selectively refreshes cache for affected partitions based on update
information from BE.
- * For APPEND/OVERWRITE: invalidate both partition cache and file cache
using existing method.
- * For NEW: add to partition values cache.
- *
- * @param table The Hive table whose partitions were modified
- * @param partitionUpdates List of partition updates from BE
- * @param modifiedPartNames Output list to collect names of modified
partitions
- * @param newPartNames Output list to collect names of new partitions
- */
- public void refreshAffectedPartitions(HMSExternalTable table,
- List<org.apache.doris.thrift.THivePartitionUpdate>
partitionUpdates,
- List<String> modifiedPartNames, List<String> newPartNames) {
- if (partitionUpdates == null || partitionUpdates.isEmpty()) {
- return;
- }
-
- for (org.apache.doris.thrift.THivePartitionUpdate update :
partitionUpdates) {
- String partitionName = update.getName();
- // Skip if partition name is null/empty (non-partitioned table
case)
- if (Strings.isNullOrEmpty(partitionName)) {
- continue;
- }
-
- switch (update.getUpdateMode()) {
- case APPEND:
- case OVERWRITE:
- modifiedPartNames.add(partitionName);
- break;
- case NEW:
- newPartNames.add(partitionName);
- break;
- default:
- LOG.warn("Unknown update mode {} for partition {}",
- update.getUpdateMode(), partitionName);
- break;
- }
- }
-
- refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
- }
-
- public void refreshAffectedPartitionsCache(HMSExternalTable table,
- List<String> modifiedPartNames, List<String> newPartNames) {
-
- // Invalidate cache for modified partitions (both partition cache and
file cache)
- for (String partitionName : modifiedPartNames) {
- invalidatePartitionCache(table, partitionName);
- }
-
- // Add new partitions to partition values cache
- if (!newPartNames.isEmpty()) {
- addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
- table.getPartitionColumnTypes(Optional.empty()));
- }
-
- // Log summary
- LOG.info("Refreshed cache for table {}: {} modified partitions, {} new
partitions",
- table.getName(), modifiedPartNames.size(),
newPartNames.size());
- }
-
public void invalidateDbCache(String dbName) {
long start = System.currentTimeMillis();
Set<PartitionValueCacheKey> keys =
partitionValuesCache.asMap().keySet();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index f82fefd89f2..93693233e11 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
@@ -111,26 +110,14 @@ public abstract class BaseExternalTableInsertExecutor
extends AbstractInsertExec
}
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
-
- // Handle post-commit operations (e.g., cache refresh)
- doAfterCommit();
+ Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
+ catalogName,
+ table.getDatabase().getFullName(),
+ table.getName(),
+ true);
}
}
- /**
- * Called after transaction commit.
- * Subclasses can override this to customize post-commit behavior.
- * Default: full table refresh.
- */
- protected void doAfterCommit() throws DdlException {
- // Default: full table refresh
- Env.getCurrentEnv().getRefreshManager().handleRefreshTable(
- catalogName,
- table.getDatabase().getFullName(),
- table.getName(),
- true);
- }
-
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink,
PhysicalSink physicalSink) {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index f9345a4495c..64f68454d84 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -17,28 +17,20 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSTransaction;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.List;
import java.util.Optional;
/**
@@ -47,8 +39,6 @@ import java.util.Optional;
public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG =
LogManager.getLogger(HiveInsertExecutor.class);
- private List<THivePartitionUpdate> partitionUpdates;
-
/**
* constructor
*/
@@ -75,46 +65,6 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
transaction.finishInsertTable(((ExternalTable)
table).getOrBuildNameMapping());
-
- // Save partition updates for cache refresh after commit
- partitionUpdates = transaction.getHivePartitionUpdates();
- }
-
- @Override
- protected void doAfterCommit() throws DdlException {
- HMSExternalTable hmsTable = (HMSExternalTable) table;
-
- // For partitioned tables, do selective partition refresh
- // For non-partitioned tables, do full table cache invalidation
- List<String> modifiedPartNames = Lists.newArrayList();
- List<String> newPartNames = Lists.newArrayList();
- if (hmsTable.isPartitionedTable() && partitionUpdates != null &&
!partitionUpdates.isEmpty()) {
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
- cache.refreshAffectedPartitions(hmsTable, partitionUpdates,
modifiedPartNames, newPartNames);
- } else {
- // Non-partitioned table or no partition updates, do full table
refresh
-
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
- }
-
- // Write edit log to notify other FEs
- ExternalObjectLog log;
- if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
- // Partition-level refresh for other FEs
- log = ExternalObjectLog.createForRefreshPartitions(
- hmsTable.getCatalog().getId(),
- table.getDatabase().getFullName(),
- table.getName(),
- modifiedPartNames,
- newPartNames);
- } else {
- // Full table refresh for other FEs
- log = ExternalObjectLog.createForRefreshTable(
- hmsTable.getCatalog().getId(),
- table.getDatabase().getFullName(),
- table.getName());
- }
- Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index 2ae4ae8e397..68a0edc430f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -21,16 +21,12 @@
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
-import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveProperties;
-import org.apache.doris.datasource.mvcc.MvccUtil;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
@@ -195,32 +191,18 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
}
private void setPartitionValues(THiveTableSink tSink) throws
AnalysisException {
- if (ConnectContext.get().getExecutor() != null) {
-
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsStartTime();
- }
-
List<THivePartition> partitions = new ArrayList<>();
-
- List<HivePartition> hivePartitions = new ArrayList<>();
- if (targetTable.isPartitionedTable()) {
- // Get partitions from cache instead of HMS client (similar to
HiveScanNode)
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog)
targetTable.getCatalog());
- HiveMetaStoreCache.HivePartitionValues partitionValues =
-
targetTable.getHivePartitionValues(MvccUtil.getSnapshotFromContext(targetTable));
- List<List<String>> partitionValuesList =
- new
ArrayList<>(partitionValues.getPartitionValuesMap().values());
- hivePartitions = cache.getAllPartitionsWithCache(targetTable,
partitionValuesList);
- }
-
- // Convert HivePartition to THivePartition (same logic as before)
- for (HivePartition partition : hivePartitions) {
+ List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
+ ((HMSExternalCatalog) targetTable.getCatalog())
+
.getClient().listPartitions(targetTable.getRemoteDbName(),
targetTable.getRemoteName());
+ for (org.apache.hadoop.hive.metastore.api.Partition partition :
hivePartitions) {
THivePartition hivePartition = new THivePartition();
-
hivePartition.setFileFormat(getTFileFormatType(partition.getInputFormat()));
- hivePartition.setValues(partition.getPartitionValues());
+ StorageDescriptor sd = partition.getSd();
+
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
+ hivePartition.setValues(partition.getValues());
THiveLocationParams locationParams = new THiveLocationParams();
- String location = partition.getPath();
+ String location = sd.getLocation();
// pass the same of write path and target path to partition
locationParams.setWritePath(location);
locationParams.setTargetPath(location);
@@ -228,12 +210,7 @@ public class HiveTableSink extends
BaseExternalTableDataSink {
hivePartition.setLocation(locationParams);
partitions.add(hivePartition);
}
-
tSink.setPartitions(partitions);
-
- if (ConnectContext.get().getExecutor() != null) {
-
ConnectContext.get().getExecutor().getSummaryProfile().setSinkGetPartitionsFinishTime();
- }
}
private void setSerDeProperties(THiveTableSink tSink) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
index 8f4adff727c..c8bc82e67a1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/HiveTableSinkTest.java
@@ -28,7 +28,6 @@ import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.ThriftHMSCachedClient;
import org.apache.doris.datasource.property.storage.StorageProperties;
-import org.apache.doris.qe.ConnectContext;
import mockit.Mock;
import mockit.MockUp;
@@ -49,13 +48,10 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-
public class HiveTableSinkTest {
@Test
public void testBindDataSink() throws UserException {
- ConnectContext ctx = new ConnectContext();
- ctx.setThreadLocalInfo();
new MockUp<ThriftHMSCachedClient>() {
@Mock
@@ -127,11 +123,6 @@ public class HiveTableSinkTest {
private void mockDifferLocationTable(String location) {
new MockUp<HMSExternalTable>() {
- @Mock
- public boolean isPartitionedTable() {
- return false;
- }
-
@Mock
public Set<String> getPartitionColumnNames() {
return new HashSet<String>() {{
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
index b8b3a39a2f9..cc3425106a5 100644
--- a/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
+++ b/regression-test/suites/external_table_p0/hive/test_hive_partitions.groovy
@@ -77,7 +77,6 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
}
for (String hivePrefix : ["hive2", "hive3"]) {
- setHivePrefix(hivePrefix)
try {
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
String catalog_name = "${hivePrefix}_test_partitions"
@@ -92,102 +91,6 @@ suite("test_hive_partitions",
"p0,external,hive,external_docker,external_docker_
q01()
- // Test cache miss scenario: Hive adds partition, then Doris
writes to it
- def test_cache_miss = {
- def dbName = "test_cache_miss_db"
- def tblName = "test_cache_miss_table"
-
- try {
- // Clean up
- hive_docker """DROP TABLE IF EXISTS ${dbName}.${tblName}"""
- hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
-
- // Create database and partitioned table in Hive
- hive_docker """CREATE DATABASE ${dbName}"""
- hive_docker """
- CREATE TABLE ${dbName}.${tblName} (
- id INT,
- name STRING
- )
- PARTITIONED BY (pt INT)
- STORED AS ORC
- """
-
- // Hive writes 3 partitions
- hive_docker """
- INSERT INTO ${dbName}.${tblName} PARTITION(pt=1)
- VALUES (1, 'hive_pt1')
- """
- hive_docker """
- INSERT INTO ${dbName}.${tblName} PARTITION(pt=2)
- VALUES (2, 'hive_pt2')
- """
- hive_docker """
- INSERT INTO ${dbName}.${tblName} PARTITION(pt=3)
- VALUES (3, 'hive_pt3')
- """
-
- sql """refresh catalog `${catalog_name}`"""
- // Doris reads data to populate cache (only knows about 3
partitions)
- def result1 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
- assertEquals(3, result1[0][0])
- logger.info("Doris cache populated with 3 partitions")
-
- // Hive writes 4th partition (Doris cache doesn't know
about it)
- hive_docker """
- INSERT INTO ${dbName}.${tblName} PARTITION(pt=4)
- VALUES (4, 'hive_pt4')
- """
- logger.info("Hive added 4th partition (pt=4)")
-
- // Doris writes to the 4th partition
- // This should trigger cache miss detection and treat as
APPEND instead of NEW
- sql """
- INSERT INTO `${catalog_name}`.`${dbName}`.`${tblName}`
- VALUES (40, 'doris_pt4', 4)
- """
- logger.info("Doris wrote to 4th partition (should handle
cache miss)")
-
- // Verify: should have 5 rows total (3 from hive + 1 from
hive pt4 + 1 from doris pt4)
- def result2 = sql """SELECT COUNT(*) as cnt FROM
`${catalog_name}`.`${dbName}`.`${tblName}`"""
- assertEquals(5, result2[0][0])
-
- // Verify partition 4 has 2 rows
- def result3 = sql """
- SELECT COUNT(*) as cnt
- FROM `${catalog_name}`.`${dbName}`.`${tblName}`
- WHERE pt = 4
- """
- assertEquals(2, result3[0][0])
-
- // Verify data content
- def result4 = sql """
- SELECT id, name
- FROM `${catalog_name}`.`${dbName}`.`${tblName}`
- WHERE pt = 4
- ORDER BY id
- """
- assertEquals(2, result4.size())
- assertEquals(4, result4[0][0])
- assertEquals("hive_pt4", result4[0][1])
- assertEquals(40, result4[1][0])
- assertEquals("doris_pt4", result4[1][1])
-
- logger.info("Cache miss test passed!")
-
- } finally {
- // Clean up
- try {
- hive_docker """DROP TABLE IF EXISTS
${dbName}.${tblName}"""
- hive_docker """DROP DATABASE IF EXISTS ${dbName}"""
- } catch (Exception e) {
- logger.warn("Cleanup failed: ${e.message}")
- }
- }
- }
-
- test_cache_miss()
-
qt_string_partition_table_with_comma """
select * from
partition_tables.string_partition_table_with_comma order by id;
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]