This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new de32b566630 [enhance](mtmv)Mtmv support audit log (#41499) de32b566630 is described below commit de32b56663071f6a9de31acf3d7ac14537a8bb9f Author: zhangdong <493738...@qq.com> AuthorDate: Fri Oct 11 14:30:47 2024 +0800 [enhance](mtmv)Mtmv support audit log (#41499) mtmv not have real sql, fill stmt with a string containing taskId and partitionNames information select * from __internal_schema.audit_log; query_id: e0c4d330b49742c7-99dcf9903ad94b51 time: 2024-09-30 15:19:05.216 client_ip: user: admin catalog: internal db: state: OK error_code: 0 error_message: query_time: 119 scan_bytes: 4542464 scan_rows: 244 return_rows: 244 stmt_id: 0 stmt_type: INSERT is_query: 0 frontend_ip: [127.0.0.1](http://127.0.0.1/) cpu_time_ms: 4 sql_hash: null sql_digest: peak_memory_bytes: 2829184 workload_group: normal stmt: Asynchronous materialized view refresh task, taskId: 260885015586390, partitions refreshed by this insert overwrite: [mv_log] --- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 43 +++++++++++++++------- .../java/org/apache/doris/mtmv/MTMVPlanUtil.java | 1 + 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index a2ec9fb03b0..a22070a0fd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -199,7 +200,7 @@ public class MTMVTask extends AbstractTask { // need get names before exec Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil .generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames); - exec(ctx, execPartitionNames, tableWithPartKey); + exec(execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); } @@ -214,10 +215,10 @@ public class MTMVTask extends AbstractTask { } } - private void exec(ConnectContext ctx, Set<String> refreshPartitionNames, + private void exec(Set<String> refreshPartitionNames, Map<TableIf, String> tableWithPartKey) throws Exception { - Objects.requireNonNull(ctx, "ctx should not be null"); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); @@ -226,20 +227,34 @@ public class MTMVTask extends AbstractTask { UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setExecutor(executor); - ctx.setQueryId(queryId); - ctx.getState().setNereids(true); - command.run(ctx, executor); - if (getStatus() == TaskStatus.CANCELED) { - // Throwing an exception to interrupt subsequent partition update tasks - throw new JobException("task is CANCELED"); - } - if (ctx.getState().getStateType() != MysqlStateType.OK) { - throw new JobException(ctx.getState().getErrorMessage()); + try { + executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + ctx.setExecutor(executor); + ctx.setQueryId(queryId); + ctx.getState().setNereids(true); + command.run(ctx, executor); + if (getStatus() == TaskStatus.CANCELED) { + // Throwing an exception to interrupt subsequent partition update tasks + throw new JobException("task is CANCELED"); + } + if (ctx.getState().getStateType() != MysqlStateType.OK) { + throw new JobException(ctx.getState().getErrorMessage()); + } + } finally { + if (executor != null) { + AuditLogHelper.logAuditLog(ctx, getDummyStmt(refreshPartitionNames), + executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); + } } } + private String getDummyStmt(Set<String> refreshPartitionNames) { + return String.format( + "Asynchronous materialized view refresh task, mvName: %s," + + "taskId: %s, partitions refreshed by this insert overwrite: %s", + mtmv.getName(), super.getTaskId(), refreshPartitionNames); + } + @Override public synchronized void onFail() throws JobException { LOG.info("mtmv task onFail, taskId: {}", super.getTaskId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java index 6ba8b63ef58..8cb3a31f2da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java @@ -59,6 +59,7 @@ public class MTMVPlanUtil { if (workloadGroup.isPresent()) { ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get()); } + ctx.setStartTime(); return ctx; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org