This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de32b566630 [enhance](mtmv)Mtmv support audit log (#41499)
de32b566630 is described below

commit de32b56663071f6a9de31acf3d7ac14537a8bb9f
Author: zhangdong <493738...@qq.com>
AuthorDate: Fri Oct 11 14:30:47 2024 +0800

    [enhance](mtmv)Mtmv support audit log (#41499)
    
    mtmv not have real sql, fill stmt with a string containing taskId and
    partitionNames information
    
    select * from __internal_schema.audit_log;
    
    query_id: e0c4d330b49742c7-99dcf9903ad94b51
                 time: 2024-09-30 15:19:05.216
            client_ip:
                 user: admin
              catalog: internal
                   db:
                state: OK
           error_code: 0
        error_message:
           query_time: 119
           scan_bytes: 4542464
            scan_rows: 244
          return_rows: 244
              stmt_id: 0
            stmt_type: INSERT
             is_query: 0
          frontend_ip: [127.0.0.1](http://127.0.0.1/)
          cpu_time_ms: 4
             sql_hash: null
           sql_digest:
    peak_memory_bytes: 2829184
       workload_group: normal
                 stmt: Asynchronous materialized view refresh task, taskId: 
260885015586390, partitions refreshed by this insert overwrite: [mv_log]
---
 .../apache/doris/job/extensions/mtmv/MTMVTask.java | 43 +++++++++++++++-------
 .../java/org/apache/doris/mtmv/MTMVPlanUtil.java   |  1 +
 2 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index a2ec9fb03b0..a22070a0fd1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -46,6 +46,7 @@ import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import 
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
+import org.apache.doris.qe.AuditLogHelper;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
@@ -199,7 +200,7 @@ public class MTMVTask extends AbstractTask {
                 // need get names before exec
                 Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots = MTMVPartitionUtil
                         .generatePartitionSnapshots(context, 
relation.getBaseTablesOneLevel(), execPartitionNames);
-                exec(ctx, execPartitionNames, tableWithPartKey);
+                exec(execPartitionNames, tableWithPartKey);
                 completedPartitions.addAll(execPartitionNames);
                 partitionSnapshots.putAll(execPartitionSnapshots);
             }
@@ -214,10 +215,10 @@ public class MTMVTask extends AbstractTask {
         }
     }
 
-    private void exec(ConnectContext ctx, Set<String> refreshPartitionNames,
+    private void exec(Set<String> refreshPartitionNames,
             Map<TableIf, String> tableWithPartKey)
             throws Exception {
-        Objects.requireNonNull(ctx, "ctx should not be null");
+        ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
         StatementContext statementContext = new StatementContext();
         ctx.setStatementContext(statementContext);
         TUniqueId queryId = generateQueryId();
@@ -226,20 +227,34 @@ public class MTMVTask extends AbstractTask {
         UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
                 .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != 
MTMVPartitionType.SELF_MANAGE
                         ? refreshPartitionNames : Sets.newHashSet(), 
tableWithPartKey);
-        executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
-        ctx.setExecutor(executor);
-        ctx.setQueryId(queryId);
-        ctx.getState().setNereids(true);
-        command.run(ctx, executor);
-        if (getStatus() == TaskStatus.CANCELED) {
-            // Throwing an exception to interrupt subsequent partition update 
tasks
-            throw new JobException("task is CANCELED");
-        }
-        if (ctx.getState().getStateType() != MysqlStateType.OK) {
-            throw new JobException(ctx.getState().getErrorMessage());
+        try {
+            executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
+            ctx.setExecutor(executor);
+            ctx.setQueryId(queryId);
+            ctx.getState().setNereids(true);
+            command.run(ctx, executor);
+            if (getStatus() == TaskStatus.CANCELED) {
+                // Throwing an exception to interrupt subsequent partition 
update tasks
+                throw new JobException("task is CANCELED");
+            }
+            if (ctx.getState().getStateType() != MysqlStateType.OK) {
+                throw new JobException(ctx.getState().getErrorMessage());
+            }
+        } finally {
+            if (executor != null) {
+                AuditLogHelper.logAuditLog(ctx, 
getDummyStmt(refreshPartitionNames),
+                        executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(), true);
+            }
         }
     }
 
+    private String getDummyStmt(Set<String> refreshPartitionNames) {
+        return String.format(
+                "Asynchronous materialized view refresh task, mvName: %s,"
+                        + "taskId: %s, partitions refreshed by this insert 
overwrite: %s",
+                mtmv.getName(), super.getTaskId(), refreshPartitionNames);
+    }
+
     @Override
     public synchronized void onFail() throws JobException {
         LOG.info("mtmv task onFail, taskId: {}", super.getTaskId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
index 6ba8b63ef58..8cb3a31f2da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -59,6 +59,7 @@ public class MTMVPlanUtil {
         if (workloadGroup.isPresent()) {
             ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
         }
+        ctx.setStartTime();
         return ctx;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to