This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9466e3f96d8 [improvement](statistics)Log one bdbje record for one load transaction. #31619 9466e3f96d8 is described below commit 9466e3f96d886ef9c7793ba395841100c3c1f741 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Sat Mar 2 01:00:37 2024 +0800 [improvement](statistics)Log one bdbje record for one load transaction. #31619 --- .../org/apache/doris/datasource/ExternalTable.java | 2 +- .../apache/doris/datasource/InternalCatalog.java | 5 +- .../org/apache/doris/journal/JournalEntity.java | 12 ++ .../java/org/apache/doris/persist/EditLog.java | 20 ++- .../apache/doris/statistics/AnalysisManager.java | 62 ++++++++-- .../doris/statistics/NewPartitionLoadedEvent.java | 22 ++-- .../apache/doris/statistics/UpdateRowsEvent.java | 27 ++-- .../doris/transaction/DatabaseTransactionMgr.java | 6 +- ...est_update_rows_and_partition_first_load.groovy | 137 +++++++++++++++++++++ 9 files changed, 251 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 6732abf2a58..7f82d0d3876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -199,7 +199,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { try { makeSureInitialized(); } catch (Exception e) { - LOG.warn("Failed to initialize table {}.{}.{}", catalog.name, dbName, name, e); + LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); return 0; } // All external table should get external row count from cache. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index bc2ffc40efc..95abc9e06c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3116,6 +3116,7 @@ public class InternalCatalog implements CatalogIf<Database> { rowsToTruncate += partition.getBaseIndex().getRowCount(); } } else { + rowsToTruncate = olapTable.getRowCount(); for (Partition partition : olapTable.getPartitions()) { // If need absolutely correct, should check running txn here. // But if the txn is in prepare state, cann't known which partitions had load data. @@ -3279,12 +3280,14 @@ public class InternalCatalog implements CatalogIf<Database> { erasePartitionDropBackendReplicas(oldPartitions); + HashMap<Long, Long> updateRecords = new HashMap<>(); + updateRecords.put(olapTable.getId(), rowsToTruncate); if (truncateEntireTable) { // Drop the whole table stats after truncate the entire table Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); } else { // Update the updated rows in table stats after truncate some partitions. - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(olapTable.getId(), rowsToTruncate); + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); } LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 2c3e37ed022..8f0d2b54580 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -127,7 +127,9 @@ import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy; import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.NewPartitionLoadedEvent; import org.apache.doris.statistics.TableStatsMeta; +import org.apache.doris.statistics.UpdateRowsEvent; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -935,6 +937,16 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_LOG_UPDATE_ROWS: { + data = UpdateRowsEvent.read(in); + isRead = true; + break; + } + case OperationType.OP_LOG_NEW_PARTITION_LOADED: { + data = NewPartitionLoadedEvent.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 88675d5cca3..d21012dd658 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -88,7 +88,9 @@ import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.AnalysisJobInfo; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.NewPartitionLoadedEvent; import org.apache.doris.statistics.TableStatsMeta; +import org.apache.doris.statistics.UpdateRowsEvent; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; import org.apache.doris.transaction.TransactionState; @@ -1181,8 +1183,14 @@ public class EditLog { env.getExternalMetaIdMgr().replayMetaIdMappingsLog((MetaIdMappingsLog) journal.getData()); break; } - case OperationType.OP_LOG_UPDATE_ROWS: - case OperationType.OP_LOG_NEW_PARTITION_LOADED: + case OperationType.OP_LOG_UPDATE_ROWS: { + env.getAnalysisManager().replayUpdateRowsRecord((UpdateRowsEvent) journal.getData()); + break; + } + case OperationType.OP_LOG_NEW_PARTITION_LOADED: { + env.getAnalysisManager().replayNewPartitionLoadedEvent((NewPartitionLoadedEvent) journal.getData()); + break; + } case OperationType.OP_LOG_ALTER_COLUMN_STATS: { // TODO: implement this while statistics finished related work. break; @@ -2022,6 +2030,14 @@ public class EditLog { logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats); } + public void logUpdateRowsRecord(UpdateRowsEvent record) { + logEdit(OperationType.OP_LOG_UPDATE_ROWS, record); + } + + public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) { + logEdit(OperationType.OP_LOG_NEW_PARTITION_LOADED, event); + } + public void logDeleteTableStats(TableStatsDeletionLog log) { logEdit(OperationType.OP_DELETE_TABLE_STATS, log); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index eafcff3e7f2..b265b88f702 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -996,21 +996,31 @@ public class AnalysisManager implements Writable { } // Invoke this when load transaction finished. - public void updateUpdatedRows(long tblId, long rows) { - TableStatsMeta statsStatus = idToTblStats.get(tblId); - if (statsStatus != null) { - statsStatus.updatedRows.addAndGet(rows); - logCreateTableStats(statsStatus); + public void updateUpdatedRows(Map<Long, Long> records) { + if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || records == null || records.isEmpty()) { + return; + } + for (Entry<Long, Long> record : records.entrySet()) { + TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); + if (statsStatus != null) { + statsStatus.updatedRows.addAndGet(record.getValue()); + } } + logUpdateRowsRecord(new UpdateRowsEvent(records)); } // Set to true means new partition loaded data - public void setNewPartitionLoaded(long tblId) { - TableStatsMeta statsStatus = idToTblStats.get(tblId); - if (statsStatus != null && Env.getCurrentEnv().isMaster() && !Env.isCheckpointThread()) { - statsStatus.newPartitionLoaded.set(true); - logCreateTableStats(statsStatus); + public void setNewPartitionLoaded(List<Long> tableIds) { + if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || tableIds == null || tableIds.isEmpty()) { + return; + } + for (long tableId : tableIds) { + TableStatsMeta statsStatus = idToTblStats.get(tableId); + if (statsStatus != null) { + statsStatus.newPartitionLoaded.set(true); + } } + logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds)); } public void updateTableStatsStatus(TableStatsMeta tableStats) { @@ -1026,6 +1036,38 @@ public class AnalysisManager implements Writable { Env.getCurrentEnv().getEditLog().logCreateTableStats(tableStats); } + public void logUpdateRowsRecord(UpdateRowsEvent record) { + Env.getCurrentEnv().getEditLog().logUpdateRowsRecord(record); + } + + public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) { + Env.getCurrentEnv().getEditLog().logNewPartitionLoadedEvent(event); + } + + public void replayUpdateRowsRecord(UpdateRowsEvent event) { + if (event == null || event.getRecords() == null) { + return; + } + for (Entry<Long, Long> record : event.getRecords().entrySet()) { + TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); + if (statsStatus != null) { + statsStatus.updatedRows.addAndGet(record.getValue()); + } + } + } + + public void replayNewPartitionLoadedEvent(NewPartitionLoadedEvent event) { + if (event == null || event.getTableIds() == null) { + return; + } + for (long tableId : event.getTableIds()) { + TableStatsMeta statsStatus = idToTblStats.get(tableId); + if (statsStatus != null) { + statsStatus.newPartitionLoaded.set(true); + } + } + } + public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> taskInfos) { recordAnalysisJob(jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java index d09cb2df6c4..891eafd2dda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/NewPartitionLoadedEvent.java @@ -27,20 +27,25 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.util.List; public class NewPartitionLoadedEvent implements Writable { - @SerializedName("partitionIdToTableId") - public final Map<Long, Long> partitionIdToTableId = new HashMap<>(); + @SerializedName("tableIds") + private List<Long> tableIds; @VisibleForTesting - public NewPartitionLoadedEvent() {} + public NewPartitionLoadedEvent(List<Long> tableIds) { + this.tableIds = tableIds; + } // No need to be thread safe, only publish thread will call this. - public void addPartition(long tableId, long partitionId) { - partitionIdToTableId.put(tableId, partitionId); + public void addTableId(long tableId) { + tableIds.add(tableId); + } + + public List<Long> getTableIds() { + return tableIds; } @Override @@ -51,7 +56,6 @@ public class NewPartitionLoadedEvent implements Writable { public static NewPartitionLoadedEvent read(DataInput dataInput) throws IOException { String json = Text.readString(dataInput); - NewPartitionLoadedEvent newPartitionLoadedEvent = GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class); - return newPartitionLoadedEvent; + return GsonUtils.GSON.fromJson(json, NewPartitionLoadedEvent.class); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java index 04f185c8b73..8cce3d29391 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java @@ -21,30 +21,24 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; -import com.google.common.annotations.VisibleForTesting; import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; import java.util.Map; public class UpdateRowsEvent implements Writable { - @SerializedName("tableIdToUpdateRows") - public final Map<Long, Long> tableIdToUpdateRows = new HashMap<>(); + @SerializedName("records") + private Map<Long, Long> records; - @VisibleForTesting - public UpdateRowsEvent() {} + public UpdateRowsEvent(Map<Long, Long> records) { + this.records = records; + } - // No need to be thread safe, only publish thread will call this. - public void addUpdateRows(long tableId, long rows) { - if (tableIdToUpdateRows.containsKey(tableId)) { - tableIdToUpdateRows.put(tableId, tableIdToUpdateRows.get(tableId) + rows); - } else { - tableIdToUpdateRows.put(tableId, rows); - } + public Map<Long, Long> getRecords() { + return records; } @Override @@ -53,9 +47,8 @@ public class UpdateRowsEvent implements Writable { Text.writeString(out, json); } - public static UpdateRowsEvent read(DataInput dataInput) throws IOException { - String json = Text.readString(dataInput); - UpdateRowsEvent updateRowsEvent = GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class); - return updateRowsEvent; + public static UpdateRowsEvent read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, UpdateRowsEvent.class); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 0e5608791e7..af94917f97d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1912,6 +1912,7 @@ public class DatabaseTransactionMgr { private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { Set<Long> errorReplicaIds = transactionState.getErrorReplicas(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + List<Long> newPartitionLoadedTableIds = new ArrayList<>(); for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { long tableId = tableCommitInfo.getTableId(); OlapTable table = (OlapTable) db.getTableNullable(tableId); @@ -1973,7 +1974,7 @@ public class DatabaseTransactionMgr { long versionTime = partitionCommitInfo.getVersionTime(); if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION && version > Partition.PARTITION_INIT_VERSION) { - analysisManager.setNewPartitionLoaded(tableId); + newPartitionLoadedTableIds.add(tableId); } partition.updateVisibleVersionAndTime(version, versionTime); if (LOG.isDebugEnabled()) { @@ -2000,7 +2001,8 @@ public class DatabaseTransactionMgr { if (LOG.isDebugEnabled()) { LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows); } - tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows); + analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds); + analysisManager.updateUpdatedRows(tableIdToNumDeltaRows); return true; } diff --git a/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy b/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy new file mode 100644 index 00000000000..5bfa58abd62 --- /dev/null +++ b/regression-test/suites/statistics/test_update_rows_and_partition_first_load.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_update_rows_and_partition_first_load", "p2") { + + String ak = getS3AK() + String sk = getS3SK() + String enabled = context.config.otherConfigs.get("enableBrokerLoad") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + sql """DROP DATABASE IF EXISTS test_update_rows_and_partition_first_load""" + sql """CREATE DATABASE test_update_rows_and_partition_first_load""" + sql """use test_update_rows_and_partition_first_load""" + sql """ + CREATE TABLE update_rows_test1 ( + id int NULL, + name String NULL + )ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + CREATE TABLE update_rows_test2 ( + id int NULL, + name String NULL + )ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + CREATE TABLE `partition_test1` ( + `id` INT NOT NULL, + `name` VARCHAR(25) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("0"), ("100")), + PARTITION p2 VALUES [("100"), ("200")), + PARTITION p3 VALUES [("200"), ("300"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + + sql """ + CREATE TABLE `partition_test2` ( + `id` INT NOT NULL, + `name` VARCHAR(25) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("0"), ("100")), + PARTITION p2 VALUES [("100"), ("200")), + PARTITION p3 VALUES [("200"), ("300"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + + sql """analyze table update_rows_test1 with sync""" + sql """analyze table update_rows_test2 with sync""" + sql """analyze table partition_test1 with sync""" + sql """analyze table partition_test2 with sync""" + + def label = "part_" + UUID.randomUUID().toString().replace("-", "0") + sql """ + LOAD LABEL ${label} ( + DATA INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_1.csv") + INTO TABLE update_rows_test1 + COLUMNS TERMINATED BY ",", + DATA INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_2.csv") + INTO TABLE update_rows_test2 + COLUMNS TERMINATED BY ",", + DATA INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_1.csv") + INTO TABLE partition_test1 + COLUMNS TERMINATED BY ",", + DATA INFILE("s3://doris-build-1308700295/regression/load/data/update_rows_2.csv") + INTO TABLE partition_test2 + COLUMNS TERMINATED BY "," + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ); + """ + + boolean finished = false; + for(int i = 0; i < 120; i++) { + def result = sql """show load where label = "$label" """ + if (result[0][2] == "FINISHED") { + finished = true; + break; + } + logger.info("Load not finished, wait one second.") + Thread.sleep(1000) + } + if (finished) { + def result = sql """show table stats update_rows_test1""" + assertEquals("5", result[0][0]) + result = sql """show table stats update_rows_test2""" + assertEquals("6", result[0][0]) + result = sql """show table stats partition_test1""" + assertEquals("5", result[0][0]) + assertEquals("true", result[0][6]) + result = sql """show table stats partition_test2""" + assertEquals("true", result[0][6]) + assertEquals("6", result[0][0]) + } + sql """DROP DATABASE IF EXISTS test_update_rows_and_partition_first_load""" + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org