This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f98146e83 [enhancement](tracing) Support forward to master tracing 
(#12290)
4f98146e83 is described below

commit 4f98146e83e55f554604c3a99efc27f816c7130b
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sun Sep 18 17:39:04 2022 +0800

    [enhancement](tracing) Support forward to master tracing (#12290)
---
 .../java/org/apache/doris/qe/ConnectProcessor.java | 51 ++++++++++++++++++++--
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 26 ++++++++++-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 46 ++++++++++---------
 gensrc/thrift/FrontendService.thrift               |  1 +
 4 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 7920ae6dae..3db4b9e036 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.datasource.CatalogIf;
@@ -62,8 +63,10 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapGetter;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -73,7 +76,9 @@ import java.io.StringReader;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -81,10 +86,23 @@ import java.util.UUID;
  */
 public class ConnectProcessor {
     private static final Logger LOG = 
LogManager.getLogger(ConnectProcessor.class);
+    private static final TextMapGetter<Map<String, String>> getter =
+            new TextMapGetter<Map<String, String>>() {
+                @Override
+                public Iterable<String> keys(Map<String, String> carrier) {
+                    return carrier.keySet();
+                }
 
+                @Override
+                public String get(Map<String, String> carrier, String key) {
+                    if (carrier.containsKey(key)) {
+                        return carrier.get(key);
+                    }
+                    return "";
+                }
+            };
     private final ConnectContext ctx;
     private ByteBuffer packetBuf;
-
     private StmtExecutor executor = null;
 
     public ConnectProcessor(ConnectContext context) {
@@ -473,8 +491,8 @@ public class ConnectProcessor {
         // explain query stmt do not have profile
         if (executor != null && !executor.getParsedStmt().isExplain()
                 && (executor.getParsedStmt() instanceof QueryStmt // currently 
only QueryStmt and insert need profile
-                    || executor.getParsedStmt() instanceof LogicalPlanAdapter
-                    || executor.getParsedStmt() instanceof InsertStmt)) {
+                || executor.getParsedStmt() instanceof LogicalPlanAdapter
+                || executor.getParsedStmt() instanceof InsertStmt)) {
             executor.writeProfile(true);
         }
     }
@@ -543,6 +561,21 @@ public class ConnectProcessor {
             }
         }
 
+        Map<String, String> traceCarrier = new HashMap<>();
+        if (request.isSetTraceCarrier()) {
+            traceCarrier = request.getTraceCarrier();
+        }
+        Context extractedContext = 
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
+                .extract(Context.current(), traceCarrier, getter);
+        // What we want is for the Traceid to remain unchanged during 
propagation.
+        // ctx.initTracer() will be called only if the Context is valid,
+        // so that the Traceid generated by SDKTracer is the same as the 
follower. Otherwise,
+        // if the Context is invalid and ctx.initTracer() is called,
+        // SDKTracer will generate a different Traceid.
+        if (Span.fromContext(extractedContext).getSpanContext().isValid()) {
+            ctx.initTracer("master trace");
+        }
+
         ctx.setThreadLocalInfo();
         StmtExecutor executor = null;
         try {
@@ -557,7 +590,17 @@ public class ConnectProcessor {
                 UUID uuid = UUID.randomUUID();
                 queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
             }
-            executor.execute(queryId);
+            Span masterQuerySpan =
+                    ctx.getTracer().spanBuilder("master 
execute").setParent(extractedContext)
+                            .setSpanKind(SpanKind.SERVER).startSpan();
+            try (Scope scope = masterQuerySpan.makeCurrent()) {
+                executor.execute(queryId);
+            } catch (Exception e) {
+                masterQuerySpan.recordException(e);
+                throw e;
+            } finally {
+                masterQuerySpan.end();
+            }
         } catch (IOException e) {
             // Client failed.
             LOG.warn("Process one query failed because IOException: ", e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 896e7c1066..977843a01c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -19,17 +19,23 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.transport.TTransportException;
 
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 public class MasterOpExecutor {
     private static final Logger LOG = 
LogManager.getLogger(MasterOpExecutor.class);
@@ -58,7 +64,17 @@ public class MasterOpExecutor {
     }
 
     public void execute() throws Exception {
-        forward();
+        Span forwardSpan =
+                
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
+                        .startSpan();
+        try (Scope scope = forwardSpan.makeCurrent()) {
+            forward();
+        } catch (Exception e) {
+            forwardSpan.recordException(e);
+            throw e;
+        } finally {
+            forwardSpan.end();
+        }
         LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
         ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
waitTimeoutMs);
     }
@@ -95,6 +111,14 @@ public class MasterOpExecutor {
         // session variables
         
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
 
+        // create a trace carrier
+        Map<String, String> traceCarrier = new HashMap<String, String>();
+        // Inject the request with the current context
+        Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
+                .inject(Context.current(), traceCarrier, (carrier, key, value) 
-> carrier.put(key, value));
+        // carrier send tracing to master
+        params.setTraceCarrier(traceCarrier);
+
         if (null != ctx.queryId()) {
             params.setQueryId(ctx.queryId());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 93779ad212..c514beee81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -160,7 +160,8 @@ public class StmtExecutor implements ProfileWriter {
 
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
     private static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
-
+    private static final String NULL_VALUE_FOR_LOAD = "\\N";
+    private final Object writeProfileLock = new Object();
     private ConnectContext context;
     private StatementContext statementContext;
     private MysqlSerializer serializer;
@@ -170,7 +171,6 @@ public class StmtExecutor implements ProfileWriter {
     private RuntimeProfile profile;
     private RuntimeProfile summaryProfile;
     private RuntimeProfile plannerRuntimeProfile;
-    private final Object writeProfileLock = new Object();
     private volatile boolean isFinishedProfile = false;
     private String queryType = "Query";
     private volatile Coordinator coord = null;
@@ -181,7 +181,6 @@ public class StmtExecutor implements ProfileWriter {
     private ShowResultSet proxyResultSet = null;
     private Data.PQueryStatistics.Builder statisticsForAuditLog;
     private boolean isCached;
-
     private QueryPlannerProfile plannerProfile = new QueryPlannerProfile();
 
     // this constructor is mainly for proxy
@@ -209,6 +208,21 @@ public class StmtExecutor implements ProfileWriter {
         this.statementContext.setParsedStatement(parsedStmt);
     }
 
+    public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
+        if (cols.size() == 0) {
+            return null;
+        }
+        InternalService.PDataRow.Builder row = 
InternalService.PDataRow.newBuilder();
+        for (Expr expr : cols) {
+            if (expr instanceof NullLiteral) {
+                row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
+            } else {
+                row.addColBuilder().setValue(expr.getStringValue());
+            }
+        }
+        return row.build();
+    }
+
     public void setCoord(Coordinator coord) {
         this.coord = coord;
     }
@@ -336,7 +350,7 @@ public class StmtExecutor implements ProfileWriter {
 
     /**
      * Used for audit in ConnectProcessor.
-     *
+     * <p>
      * TODO: There are three interface in StatementBase be called when doing 
audit:
      *      toDigest needAuditEncryption when parsedStmt is not a query
      *      and isValuesOrConstantSelect when parsedStmt is instance of 
InsertStmt.
@@ -346,7 +360,7 @@ public class StmtExecutor implements ProfileWriter {
      *      isValuesOrConstantSelect: when this interface return true, 
original string is truncated at 1024
      *
      * @return parsed and analyzed statement for Stale planner.
-     *          an unresolved LogicalPlan wrapped with a LogicalPlanAdapter 
for Nereids.
+     * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
      */
     public StatementBase getParsedStmt() {
         return parsedStmt;
@@ -569,6 +583,7 @@ public class StmtExecutor implements ProfileWriter {
     /**
      * get variables in stmt.
      * TODO: only support select stmt now. need to support Nereids.
+     *
      * @throws DdlException
      */
     private void analyzeVariablesInStmt() throws DdlException {
@@ -897,7 +912,7 @@ public class StmtExecutor implements ProfileWriter {
     // the meta fields must be sent right before the first batch of data(or 
eos flag).
     // so if it has data(or eos is true), this method must return true.
     private boolean sendCachedValues(MysqlChannel channel, 
List<InternalService.PCacheValue> cacheValues,
-                                     SelectStmt selectStmt, boolean 
isSendFields, boolean isEos)
+            SelectStmt selectStmt, boolean isSendFields, boolean isEos)
             throws Exception {
         RowBatch batch = null;
         boolean isSend = isSendFields;
@@ -1123,7 +1138,7 @@ public class StmtExecutor implements ProfileWriter {
             plannerProfile.setQueryFetchResultFinishTime();
         } catch (Exception e) {
             fetchResultSpan.recordException(e);
-            throw  e;
+            throw e;
         } finally {
             fetchResultSpan.end();
         }
@@ -1331,23 +1346,6 @@ public class StmtExecutor implements ProfileWriter {
         executor.beginTransaction(request);
     }
 
-    private static final String NULL_VALUE_FOR_LOAD = "\\N";
-
-    public static InternalService.PDataRow getRowStringValue(List<Expr> cols) {
-        if (cols.size() == 0) {
-            return null;
-        }
-        InternalService.PDataRow.Builder row = 
InternalService.PDataRow.newBuilder();
-        for (Expr expr : cols) {
-            if (expr instanceof NullLiteral) {
-                row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
-            } else {
-                row.addColBuilder().setValue(expr.getStringValue());
-            }
-        }
-        return row.build();
-    }
-
     // Process a select statement.
     private void handleInsertStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index ebb479140a..d126bf4dc5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -435,6 +435,7 @@ struct TMasterOpRequest {
     18: optional i64 insert_visible_timeout_ms // deprecated, move into 
session_variables
     19: optional map<string, string> session_variables
     20: optional bool foldConstantByBe
+    21: optional map<string, string> trace_carrier
 }
 
 struct TColumnDefinition {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to