This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4f98146e83 [enhancement](tracing) Support forward to master tracing (#12290) 4f98146e83 is described below commit 4f98146e83e55f554604c3a99efc27f816c7130b Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Sun Sep 18 17:39:04 2022 +0800 [enhancement](tracing) Support forward to master tracing (#12290) --- .../java/org/apache/doris/qe/ConnectProcessor.java | 51 ++++++++++++++++++++-- .../java/org/apache/doris/qe/MasterOpExecutor.java | 26 ++++++++++- .../java/org/apache/doris/qe/StmtExecutor.java | 46 ++++++++++--------- gensrc/thrift/FrontendService.thrift | 1 + 4 files changed, 95 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 7920ae6dae..3db4b9e036 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -38,6 +38,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.datasource.CatalogIf; @@ -62,8 +63,10 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapGetter; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -73,7 +76,9 @@ import java.io.StringReader; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -81,10 +86,23 @@ import java.util.UUID; */ public class ConnectProcessor { private static final Logger LOG = LogManager.getLogger(ConnectProcessor.class); + private static final TextMapGetter<Map<String, String>> getter = + new TextMapGetter<Map<String, String>>() { + @Override + public Iterable<String> keys(Map<String, String> carrier) { + return carrier.keySet(); + } + @Override + public String get(Map<String, String> carrier, String key) { + if (carrier.containsKey(key)) { + return carrier.get(key); + } + return ""; + } + }; private final ConnectContext ctx; private ByteBuffer packetBuf; - private StmtExecutor executor = null; public ConnectProcessor(ConnectContext context) { @@ -473,8 +491,8 @@ public class ConnectProcessor { // explain query stmt do not have profile if (executor != null && !executor.getParsedStmt().isExplain() && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile - || executor.getParsedStmt() instanceof LogicalPlanAdapter - || executor.getParsedStmt() instanceof InsertStmt)) { + || executor.getParsedStmt() instanceof LogicalPlanAdapter + || executor.getParsedStmt() instanceof InsertStmt)) { executor.writeProfile(true); } } @@ -543,6 +561,21 @@ public class ConnectProcessor { } } + Map<String, String> traceCarrier = new HashMap<>(); + if (request.isSetTraceCarrier()) { + traceCarrier = request.getTraceCarrier(); + } + Context extractedContext = Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator() + .extract(Context.current(), traceCarrier, getter); + // What we want is for the Traceid to remain unchanged during propagation. + // ctx.initTracer() will be called only if the Context is valid, + // so that the Traceid generated by SDKTracer is the same as the follower. Otherwise, + // if the Context is invalid and ctx.initTracer() is called, + // SDKTracer will generate a different Traceid. + if (Span.fromContext(extractedContext).getSpanContext().isValid()) { + ctx.initTracer("master trace"); + } + ctx.setThreadLocalInfo(); StmtExecutor executor = null; try { @@ -557,7 +590,17 @@ public class ConnectProcessor { UUID uuid = UUID.randomUUID(); queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } - executor.execute(queryId); + Span masterQuerySpan = + ctx.getTracer().spanBuilder("master execute").setParent(extractedContext) + .setSpanKind(SpanKind.SERVER).startSpan(); + try (Scope scope = masterQuerySpan.makeCurrent()) { + executor.execute(queryId); + } catch (Exception e) { + masterQuerySpan.recordException(e); + throw e; + } finally { + masterQuerySpan.end(); + } } catch (IOException e) { // Client failed. LOG.warn("Process one query failed because IOException: ", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 896e7c1066..977843a01c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -19,17 +19,23 @@ package org.apache.doris.qe; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.transport.TTransportException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; public class MasterOpExecutor { private static final Logger LOG = LogManager.getLogger(MasterOpExecutor.class); @@ -58,7 +64,17 @@ public class MasterOpExecutor { } public void execute() throws Exception { - forward(); + Span forwardSpan = + ctx.getTracer().spanBuilder("forward").setParent(Context.current()) + .startSpan(); + try (Scope scope = forwardSpan.makeCurrent()) { + forward(); + } catch (Exception e) { + forwardSpan.recordException(e); + throw e; + } finally { + forwardSpan.end(); + } LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); } @@ -95,6 +111,14 @@ public class MasterOpExecutor { // session variables params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); + // create a trace carrier + Map<String, String> traceCarrier = new HashMap<String, String>(); + // Inject the request with the current context + Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator() + .inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value)); + // carrier send tracing to master + params.setTraceCarrier(traceCarrier); + if (null != ctx.queryId()) { params.setQueryId(ctx.queryId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 93779ad212..c514beee81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -160,7 +160,8 @@ public class StmtExecutor implements ProfileWriter { private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); private static final int MAX_DATA_TO_SEND_FOR_TXN = 100; - + private static final String NULL_VALUE_FOR_LOAD = "\\N"; + private final Object writeProfileLock = new Object(); private ConnectContext context; private StatementContext statementContext; private MysqlSerializer serializer; @@ -170,7 +171,6 @@ public class StmtExecutor implements ProfileWriter { private RuntimeProfile profile; private RuntimeProfile summaryProfile; private RuntimeProfile plannerRuntimeProfile; - private final Object writeProfileLock = new Object(); private volatile boolean isFinishedProfile = false; private String queryType = "Query"; private volatile Coordinator coord = null; @@ -181,7 +181,6 @@ public class StmtExecutor implements ProfileWriter { private ShowResultSet proxyResultSet = null; private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; - private QueryPlannerProfile plannerProfile = new QueryPlannerProfile(); // this constructor is mainly for proxy @@ -209,6 +208,21 @@ public class StmtExecutor implements ProfileWriter { this.statementContext.setParsedStatement(parsedStmt); } + public static InternalService.PDataRow getRowStringValue(List<Expr> cols) { + if (cols.size() == 0) { + return null; + } + InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); + for (Expr expr : cols) { + if (expr instanceof NullLiteral) { + row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD); + } else { + row.addColBuilder().setValue(expr.getStringValue()); + } + } + return row.build(); + } + public void setCoord(Coordinator coord) { this.coord = coord; } @@ -336,7 +350,7 @@ public class StmtExecutor implements ProfileWriter { /** * Used for audit in ConnectProcessor. - * + * <p> * TODO: There are three interface in StatementBase be called when doing audit: * toDigest needAuditEncryption when parsedStmt is not a query * and isValuesOrConstantSelect when parsedStmt is instance of InsertStmt. @@ -346,7 +360,7 @@ public class StmtExecutor implements ProfileWriter { * isValuesOrConstantSelect: when this interface return true, original string is truncated at 1024 * * @return parsed and analyzed statement for Stale planner. - * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids. + * an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids. */ public StatementBase getParsedStmt() { return parsedStmt; @@ -569,6 +583,7 @@ public class StmtExecutor implements ProfileWriter { /** * get variables in stmt. * TODO: only support select stmt now. need to support Nereids. + * * @throws DdlException */ private void analyzeVariablesInStmt() throws DdlException { @@ -897,7 +912,7 @@ public class StmtExecutor implements ProfileWriter { // the meta fields must be sent right before the first batch of data(or eos flag). // so if it has data(or eos is true), this method must return true. private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCacheValue> cacheValues, - SelectStmt selectStmt, boolean isSendFields, boolean isEos) + SelectStmt selectStmt, boolean isSendFields, boolean isEos) throws Exception { RowBatch batch = null; boolean isSend = isSendFields; @@ -1123,7 +1138,7 @@ public class StmtExecutor implements ProfileWriter { plannerProfile.setQueryFetchResultFinishTime(); } catch (Exception e) { fetchResultSpan.recordException(e); - throw e; + throw e; } finally { fetchResultSpan.end(); } @@ -1331,23 +1346,6 @@ public class StmtExecutor implements ProfileWriter { executor.beginTransaction(request); } - private static final String NULL_VALUE_FOR_LOAD = "\\N"; - - public static InternalService.PDataRow getRowStringValue(List<Expr> cols) { - if (cols.size() == 0) { - return null; - } - InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder(); - for (Expr expr : cols) { - if (expr instanceof NullLiteral) { - row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD); - } else { - row.addColBuilder().setValue(expr.getStringValue()); - } - } - return row.build(); - } - // Process a select statement. private void handleInsertStmt() throws Exception { // Every time set no send flag and clean all data in buffer diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index ebb479140a..d126bf4dc5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -435,6 +435,7 @@ struct TMasterOpRequest { 18: optional i64 insert_visible_timeout_ms // deprecated, move into session_variables 19: optional map<string, string> session_variables 20: optional bool foldConstantByBe + 21: optional map<string, string> trace_carrier } struct TColumnDefinition { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org