This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new c6c2dc20e4 [branch-1.2-lts](cherry-pick) enable strong consistency by 
syncing max journal… (#21804)
c6c2dc20e4 is described below

commit c6c2dc20e418f95f4c0ab0ed40de061ba71ed5e4
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Tue Jul 18 15:13:52 2023 +0800

    [branch-1.2-lts](cherry-pick) enable strong consistency by syncing max 
journal… (#21804)
    
    Add a session var & config enable_strong_consistency_read to solve the 
problem that loading result may be shortly invisible to follwers, to meet users 
requirements in strong consistency read scenario.
    
    Will sync max journal id from master and wait for replaying.
---
 docs/en/docs/advanced/variables.md                 |   4 +
 docs/zh-CN/docs/advanced/variables.md              |   5 +
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 124 ++++++++++++++++-----
 .../java/org/apache/doris/qe/SessionVariable.java  |   5 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   9 ++
 .../apache/doris/service/FrontendServiceImpl.java  |   5 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 7 files changed, 125 insertions(+), 28 deletions(-)

diff --git a/docs/en/docs/advanced/variables.md 
b/docs/en/docs/advanced/variables.md
index 27769a6fae..7152fa49bb 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -647,6 +647,10 @@ Translated with www.DeepL.com/Translator (free version)
     +--------------+
     ```
 
+* `enable_strong_consistency_read`
+
+  Used to enable strong consistent reading. By default, Doris supports strong 
consistency within the same session, that is, changes to data within the same 
session are visible in real time. If you want strong consistent reads between 
sessions, set this variable to true. 
+
 ***
 
 #### Supplementary instructions on statement execution timeout control
diff --git a/docs/zh-CN/docs/advanced/variables.md 
b/docs/zh-CN/docs/advanced/variables.md
index 939f69b6c2..2309389367 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -632,6 +632,11 @@ try (Connection conn = 
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
     | 10000000     |
     +--------------+
     ```
+ 
+* `enable_strong_consistency_read`
+
+  用以开启强一致读。Doris 
默认支持同一个会话内的强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。
+
 ***
 
 #### 关于语句执行超时控制的补充说明
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 57e4fe67aa..b3ccdf3bbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.telemetry.Telemetry;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TMasterOpRequest;
@@ -26,11 +27,13 @@ import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.collect.ImmutableMap;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
 
 import java.nio.ByteBuffer;
@@ -63,24 +66,40 @@ public class MasterOpExecutor {
         this.shouldNotRetry = !isQuery;
     }
 
+    /**
+     * used for simply syncing journal with master under strong consistency 
mode
+     */
+    public MasterOpExecutor(ConnectContext ctx) {
+        this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
+    }
+
     public void execute() throws Exception {
         Span forwardSpan =
                 
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
                         .startSpan();
-        try (Scope scope = forwardSpan.makeCurrent()) {
-            forward();
+        try (Scope ignored = forwardSpan.makeCurrent()) {
+            result = forward(buildStmtForwardParams());
         } catch (Exception e) {
             forwardSpan.recordException(e);
             throw e;
         } finally {
             forwardSpan.end();
         }
+        waitOnReplaying();
+    }
+
+    public void syncJournal() throws Exception {
+        result = forward(buildSyncJournalParmas());
+        waitOnReplaying();
+    }
+
+    private void waitOnReplaying() throws DdlException {
         LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
         ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
waitTimeoutMs);
     }
 
     // Send request to Master
-    private void forward() throws Exception {
+    private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
         if (!ctx.getEnv().isReady()) {
             throw new Exception("Node catalog is not ready, please wait for a 
while.");
         }
@@ -95,7 +114,50 @@ public class MasterOpExecutor {
             // may throw NullPointerException. add err msg
             throw new Exception("Failed to get master client.", e);
         }
+        final StringBuilder forwardMsg = new 
StringBuilder(String.format("forward to Master %s", thriftAddress));
+        if (!params.isSyncJournalOnly()) {
+            forwardMsg.append(", statement: %s").append(ctx.getStmtId());
+        }
+        LOG.info(forwardMsg.toString());
+
+        boolean isReturnToPool = false;
+        try {
+            final TMasterOpResult result = client.forward(params);
+            isReturnToPool = true;
+            return result;
+        } catch (TTransportException e) {
+            // wrap the raw exception.
+            forwardMsg.append(" : failed");
+            Exception exception = new 
ForwardToMasterException(String.format(forwardMsg.toString()), e);
+
+            boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
+            if (!ok) {
+                throw exception;
+            }
+            if (shouldNotRetry || e.getType() == 
TTransportException.TIMED_OUT) {
+                throw exception;
+            } else {
+                LOG.warn(forwardMsg.append(" twice").toString(), e);
+                try {
+                    TMasterOpResult result = client.forward(params);
+                    isReturnToPool = true;
+                    return result;
+                } catch (TException ex) {
+                    throw exception;
+                }
+            }
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
+            }
+        }
+    }
+
+    private TMasterOpRequest buildStmtForwardParams() {
         TMasterOpRequest params = new TMasterOpRequest();
+        //node ident
         params.setCluster(ctx.getClusterName());
         params.setSql(originStmt.originStmt);
         params.setStmtIdx(originStmt.idx);
@@ -122,32 +184,14 @@ public class MasterOpExecutor {
         if (null != ctx.queryId()) {
             params.setQueryId(ctx.queryId());
         }
+        return params;
+    }
 
-        LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), 
thriftAddress);
-
-        boolean isReturnToPool = false;
-        try {
-            result = client.forward(params);
-            isReturnToPool = true;
-        } catch (TTransportException e) {
-            boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
-            if (!ok) {
-                throw e;
-            }
-            if (shouldNotRetry || e.getType() == 
TTransportException.TIMED_OUT) {
-                throw e;
-            } else {
-                LOG.warn("Forward statement " + ctx.getStmtId() + " to Master 
" + thriftAddress + " twice", e);
-                result = client.forward(params);
-                isReturnToPool = true;
-            }
-        } finally {
-            if (isReturnToPool) {
-                ClientPool.frontendPool.returnObject(thriftAddress, client);
-            } else {
-                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
-            }
-        }
+    private TMasterOpRequest buildSyncJournalParmas() {
+        final TMasterOpRequest params = new TMasterOpRequest();
+        //node ident
+        params.setSyncJournalOnly(true);
+        return params;
     }
 
     public ByteBuffer getOutputPacket() {
@@ -190,4 +234,28 @@ public class MasterOpExecutor {
     public void setResult(TMasterOpResult result) {
         this.result = result;
     }
+
+    public static class ForwardToMasterException extends RuntimeException {
+
+        private static final Map<Integer, String> TYPE_MSG_MAP =
+                ImmutableMap.<Integer, String>builder()
+                        .put(TTransportException.UNKNOWN, "Unknown exception")
+                        .put(TTransportException.NOT_OPEN, "Connection is not 
open")
+                        .put(TTransportException.ALREADY_OPEN, "Connection has 
already opened up")
+                        .put(TTransportException.TIMED_OUT, "Connection 
timeout")
+                        .put(TTransportException.END_OF_FILE, "EOF")
+                        .put(TTransportException.CORRUPTED_DATA, "Corrupted 
data")
+                        .build();
+
+        private final String msg;
+
+        public ForwardToMasterException(String msg, TTransportException 
exception) {
+            this.msg = msg + ", cause: " + 
TYPE_MSG_MAP.get(exception.getType());
+        }
+
+        @Override
+        public String getMessage() {
+            return msg;
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 83e55452bf..40dfa11e8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -262,6 +262,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // fix replica to query. If num = 1, query the smallest replica, if 2 is 
the second smallest replica.
     public static final String USE_FIX_REPLICA = "use_fix_replica";
 
+    public static final String ENABLE_STRONG_CONSISTENCY = 
"enable_strong_consistency_read";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -689,6 +691,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = USE_FIX_REPLICA)
     public int useFixReplica = -1;
 
+    @VariableMgr.VarAttr(name = ENABLE_STRONG_CONSISTENCY)
+    public boolean enableStrongConsistencyRead = false;
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index be97ec39b6..57d63b688c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -627,6 +627,14 @@ public class StmtExecutor implements ProfileWriter {
         }
     }
 
+    private void syncJournalIfNeeded() throws Exception {
+        final Env env = context.getEnv();
+        if (env.isMaster() || 
!context.getSessionVariable().enableStrongConsistencyRead) {
+            return;
+        }
+        new MasterOpExecutor(context).syncJournal();
+    }
+
     /**
      * get variables in stmt.
      *
@@ -1091,6 +1099,7 @@ public class StmtExecutor implements ProfileWriter {
         }
 
         // handle selects that fe can do without be, so we can make sql tools 
happy, especially the setup step.
+        syncJournalIfNeeded();
         if (parsedStmt instanceof SelectStmt && ((SelectStmt) 
parsedStmt).getTableRefs().isEmpty()) {
             SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
             if (handleSelectRequestInFe(parsedSelectStmt)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 638f172660..2c6ac8d11d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -536,6 +536,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 throw new TException("request from invalid host was 
rejected.");
             }
         }
+        if (params.isSyncJournalOnly()) {
+            final TMasterOpResult result = new TMasterOpResult();
+            result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+            return result;
+        }
 
         // add this log so that we can track this stmt
         LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), 
clientAddr.getHostname());
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 3634a11333..207f8fb058 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -441,6 +441,7 @@ struct TMasterOpRequest {
     19: optional map<string, string> session_variables
     20: optional bool foldConstantByBe
     21: optional map<string, string> trace_carrier
+    22: optional bool syncJournalOnly // if set to true, this request means to 
do nothing but just sync max journal id of master
 }
 
 struct TColumnDefinition {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to