This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 63cbd725 [Bug] Fix retry of lost tablet when tablet read fails (#502)
63cbd725 is described below

commit 63cbd72550f031ad652707f114a735ba0ea49e14
Author: wudi <676366...@qq.com>
AuthorDate: Fri Oct 25 10:22:09 2024 +0800

    [Bug] Fix retry of lost tablet when tablet read fails (#502)
---
 .../apache/doris/flink/backend/BackendClient.java  | 24 ++++++++++++++++++++--
 .../doris/flink/rest/PartitionDefinition.java      | 16 +++++++++++++++
 .../source/assigners/SimpleSplitAssigner.java      |  6 +++---
 .../flink/source/reader/DorisValueReader.java      | 10 +++++++++
 .../doris/flink/source/reader/ValueReader.java     |  2 +-
 .../doris/flink/table/DorisTableInputSplit.java    | 11 ++++++++++
 6 files changed, 63 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 9881ec5f..be976eff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -139,10 +139,11 @@ public class BackendClient {
             open();
         }
         TException ex = null;
+        TScanOpenResult result = null;
         for (int attempt = 0; attempt < retries; ++attempt) {
             logger.debug("Attempt {} to openScanner {}.", attempt, routing);
             try {
-                TScanOpenResult result = client.openScanner(openParams);
+                result = client.openScanner(openParams);
                 if (result == null) {
                     logger.warn("Open scanner result from {} is null.", 
routing);
                     continue;
@@ -155,12 +156,28 @@ public class BackendClient {
                             result.getStatus().getErrorMsgs());
                     continue;
                 }
+                logger.info(
+                        "OpenScanner success for Doris BE '{}' with contextId 
'{}' for tablets '{}'.",
+                        routing,
+                        result.getContextId(),
+                        openParams.tablet_ids);
                 return result;
             } catch (TException e) {
                 logger.warn("Open scanner from {} failed.", routing, e);
                 ex = e;
             }
         }
+        if (result != null && (TStatusCode.OK != 
(result.getStatus().getStatusCode()))) {
+            logger.error(
+                    ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE,
+                    routing,
+                    result.getStatus().getStatusCode(),
+                    result.getStatus().getErrorMsgs());
+            throw new DorisInternalException(
+                    routing.toString(),
+                    result.getStatus().getStatusCode(),
+                    result.getStatus().getErrorMsgs());
+        }
         logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
         throw new ConnectedFailedException(routing.toString(), ex);
     }
@@ -244,7 +261,10 @@ public class BackendClient {
                 logger.warn("Close scanner from {} failed.", routing, e);
             }
         }
-        logger.info("CloseScanner to Doris BE '{}' success.", routing);
+        logger.info(
+                "CloseScanner to Doris BE '{}' success for contextId {} ",
+                routing,
+                closeParams.getContextId());
         close();
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
index dd57a991..9f2bd07d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
@@ -144,4 +144,20 @@ public class PartitionDefinition implements Serializable, 
Comparable<PartitionDe
                 + '\''
                 + '}';
     }
+
+    public String toStringWithoutPlan() {
+        return "PartitionDefinition{"
+                + "database='"
+                + database
+                + '\''
+                + ", table='"
+                + table
+                + '\''
+                + ", beAddress='"
+                + beAddress
+                + '\''
+                + ", tabletIds="
+                + tabletIds
+                + '}';
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
index 7948244c..57a56f53 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java
@@ -45,9 +45,9 @@ public class SimpleSplitAssigner implements 
DorisSplitAssigner {
     }
 
     @Override
-    public void addSplits(Collection<DorisSourceSplit> splits) {
-        LOG.info("Adding splits: {}", splits);
-        splits.addAll(splits);
+    public void addSplits(Collection<DorisSourceSplit> newSplits) {
+        LOG.info("Adding splits: {}", newSplits);
+        splits.addAll(newSplits);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
index e55e1775..2db4f798 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java
@@ -182,6 +182,11 @@ public class DorisValueReader extends ValueReader 
implements AutoCloseable {
                                         } catch (InterruptedException e) {
                                             throw new DorisRuntimeException(e);
                                         }
+                                    } else {
+                                        LOG.info(
+                                                "Async scan finished , 
tablets: {}, offset: {}",
+                                                partition.getTabletIds(),
+                                                offset);
                                     }
                                 }
                             } finally {
@@ -245,6 +250,11 @@ public class DorisValueReader extends ValueReader 
implements AutoCloseable {
                     eos.set(nextResult.isEos());
                     if (!eos.get()) {
                         rowBatch = new RowBatch(nextResult, 
schema).readArrow();
+                    } else {
+                        LOG.info(
+                                "Scan finished, tablets: {}, offset: {}",
+                                partition.getTabletIds(),
+                                offset);
                     }
                 }
                 hasNext = !eos.get();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
index 9e453934..9e1d091e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java
@@ -34,7 +34,7 @@ public abstract class ValueReader {
             DorisReadOptions readOptions,
             Logger logger)
             throws DorisException {
-        logger.info("create reader for partition: {}", partition);
+        logger.info("create reader for partition: {}", 
partition.toStringWithoutPlan());
         if (readOptions.getUseFlightSql()) {
             return new DorisFlightValueReader(
                     partition,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
index 9620c575..9abe15f2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
@@ -39,4 +39,15 @@ public class DorisTableInputSplit implements InputSplit, 
java.io.Serializable {
     public int getSplitNumber() {
         return splitNumber;
     }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "DorisTableInputSplit: %s.%s,id=%s,be=%s,tablets=%s",
+                partition.getDatabase(),
+                partition.getTable(),
+                splitNumber,
+                partition.getBeAddress(),
+                partition.getTabletIds());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to